obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
9use concepts::{
10    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
11    FinishedExecutionError,
12};
13use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
14use concepts::{JoinSetId, PermanentFailureKind};
15use std::marker::PhantomData;
16use std::{
17    sync::{
18        atomic::{AtomicBool, Ordering},
19        Arc,
20    },
21    time::Duration,
22};
23use tokio::task::{AbortHandle, JoinHandle};
24use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
25use utils::time::ClockFn;
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29    pub lock_expiry: Duration,
30    pub tick_sleep: Duration,
31    pub batch_size: u32,
32    pub component_id: ComponentId,
33    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37    worker: Arc<dyn Worker>,
38    config: ExecConfig,
39    clock_fn: C, // Used for obtaining current time when the execution finishes.
40    db_pool: P,
41    executor_id: ExecutorId,
42    phantom_data: PhantomData<DB>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
98    worker
99        .exported_functions()
100        .iter()
101        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
102        .collect::<Arc<_>>()
103}
104
105impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
106    #[cfg(feature = "test")]
107    pub fn new(
108        worker: Arc<dyn Worker>,
109        config: ExecConfig,
110        clock_fn: C,
111        db_pool: P,
112        ffqns: Arc<[FunctionFqn]>,
113    ) -> Self {
114        Self {
115            worker,
116            config,
117            executor_id: ExecutorId::generate(),
118            db_pool,
119            phantom_data: PhantomData,
120            ffqns,
121            clock_fn,
122        }
123    }
124
125    pub fn spawn_new(
126        worker: Arc<dyn Worker>,
127        config: ExecConfig,
128        clock_fn: C,
129        db_pool: P,
130        executor_id: ExecutorId,
131    ) -> ExecutorTaskHandle {
132        let is_closing = Arc::new(AtomicBool::default());
133        let is_closing_inner = is_closing.clone();
134        let ffqns = extract_ffqns(worker.as_ref());
135        let component_id = config.component_id.clone();
136        let abort_handle = tokio::spawn(async move {
137            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
138            let task = Self {
139                worker,
140                config,
141                executor_id,
142                db_pool,
143                phantom_data: PhantomData,
144                ffqns: ffqns.clone(),
145                clock_fn: clock_fn.clone(),
146            };
147            loop {
148                let _ = task.tick(clock_fn.now()).await;
149                let executed_at = clock_fn.now();
150                task.db_pool
151                    .connection()
152                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
153                    .await;
154                if is_closing_inner.load(Ordering::Relaxed) {
155                    return;
156                }
157            }
158        })
159        .abort_handle();
160        ExecutorTaskHandle {
161            is_closing,
162            abort_handle,
163            component_id,
164            executor_id,
165        }
166    }
167
168    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
169        if let Some(task_limiter) = &self.config.task_limiter {
170            let mut locks = Vec::new();
171            for _ in 0..self.config.batch_size {
172                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
173                    locks.push(Some(permit));
174                } else {
175                    break;
176                }
177            }
178            locks
179        } else {
180            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
181            for _ in 0..self.config.batch_size {
182                vec.push(None);
183            }
184            vec
185        }
186    }
187
188    #[cfg(feature = "test")]
189    pub async fn tick2(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
190        self.tick(executed_at).await
191    }
192
193    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
194    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195        let locked_executions = {
196            let mut permits = self.acquire_task_permits();
197            if permits.is_empty() {
198                return Ok(ExecutionProgress::default());
199            }
200            let db_connection = self.db_pool.connection();
201            let lock_expires_at = executed_at + self.config.lock_expiry;
202            let locked_executions = db_connection
203                .lock_pending(
204                    permits.len(), // batch size
205                    executed_at,   // fetch expiring before now
206                    self.ffqns.clone(),
207                    executed_at, // created at
208                    self.config.component_id.clone(),
209                    self.executor_id,
210                    lock_expires_at,
211                )
212                .await
213                .map_err(|err| {
214                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
215                })?;
216            // Drop permits if too many were allocated.
217            while permits.len() > locked_executions.len() {
218                permits.pop();
219            }
220            assert_eq!(permits.len(), locked_executions.len());
221            locked_executions.into_iter().zip(permits)
222        };
223        let execution_deadline = executed_at + self.config.lock_expiry;
224
225        let mut executions = Vec::with_capacity(locked_executions.len());
226        for (locked_execution, permit) in locked_executions {
227            let execution_id = locked_execution.execution_id.clone();
228            let join_handle = {
229                let worker = self.worker.clone();
230                let db_pool = self.db_pool.clone();
231                let clock_fn = self.clock_fn.clone();
232                let run_id = locked_execution.run_id;
233                let worker_span = info_span!(parent: None, "worker",
234                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
235                locked_execution.metadata.enrich(&worker_span);
236                tokio::spawn({
237                    let worker_span2 = worker_span.clone();
238                    async move {
239                        let res = Self::run_worker(
240                            worker,
241                            &db_pool,
242                            execution_deadline,
243                            clock_fn,
244                            locked_execution,
245                            worker_span2,
246                        )
247                        .await;
248                        if let Err(db_error) = res {
249                            error!("Execution will be timed out not writing `{db_error:?}`");
250                        }
251                        drop(permit);
252                    }
253                    .instrument(worker_span)
254                })
255            };
256            executions.push((execution_id, join_handle));
257        }
258        Ok(ExecutionProgress { executions })
259    }
260
261    async fn run_worker(
262        worker: Arc<dyn Worker>,
263        db_pool: &P,
264        execution_deadline: DateTime<Utc>,
265        clock_fn: C,
266        locked_execution: LockedExecution,
267        worker_span: Span,
268    ) -> Result<(), DbError> {
269        debug!("Worker::run starting");
270        trace!(
271            version = %locked_execution.version,
272            params = ?locked_execution.params,
273            event_history = ?locked_execution.event_history,
274            "Worker::run starting"
275        );
276        let can_be_retried = ExecutionLog::can_be_retried_after(
277            locked_execution.temporary_event_count + 1,
278            locked_execution.max_retries,
279            locked_execution.retry_exp_backoff,
280        );
281        let unlock_expiry_on_limit_reached =
282            ExecutionLog::compute_retry_duration_when_retrying_forever(
283                locked_execution.temporary_event_count + 1,
284                locked_execution.retry_exp_backoff,
285            );
286        let ctx = WorkerContext {
287            execution_id: locked_execution.execution_id.clone(),
288            metadata: locked_execution.metadata,
289            ffqn: locked_execution.ffqn,
290            params: locked_execution.params,
291            event_history: locked_execution.event_history,
292            responses: locked_execution
293                .responses
294                .into_iter()
295                .map(|outer| outer.event)
296                .collect(),
297            version: locked_execution.version,
298            execution_deadline,
299            can_be_retried: can_be_retried.is_some(),
300            run_id: locked_execution.run_id,
301            worker_span,
302        };
303        let worker_result = worker.run(ctx).await;
304        trace!(?worker_result, "Worker::run finished");
305        let result_obtained_at = clock_fn.now();
306        match Self::worker_result_to_execution_event(
307            locked_execution.execution_id,
308            worker_result,
309            result_obtained_at,
310            locked_execution.parent,
311            can_be_retried,
312            unlock_expiry_on_limit_reached,
313        )? {
314            Some(append) => {
315                let db_connection = db_pool.connection();
316                trace!("Appending {append:?}");
317                append.clone().append(&db_connection).await
318            }
319            None => Ok(()),
320        }
321    }
322
323    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
324    /// Map the `WorkerError` to an temporary or a permanent failure.
325    #[expect(clippy::too_many_lines)]
326    fn worker_result_to_execution_event(
327        execution_id: ExecutionId,
328        worker_result: WorkerResult,
329        result_obtained_at: DateTime<Utc>,
330        parent: Option<(ExecutionId, JoinSetId)>,
331        can_be_retried: Option<Duration>,
332        unlock_expiry_on_limit_reached: Duration,
333    ) -> Result<Option<Append>, DbError> {
334        Ok(match worker_result {
335            WorkerResult::Ok(result, new_version) => {
336                info!(
337                    "Execution finished: {}",
338                    result.as_pending_state_finished_result()
339                );
340                let child_finished =
341                    parent.map(
342                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
343                            parent_execution_id,
344                            parent_join_set,
345                            result: Ok(result.clone()),
346                        },
347                    );
348                let primary_event = AppendRequest {
349                    created_at: result_obtained_at,
350                    event: ExecutionEventInner::Finished { result: Ok(result) },
351                };
352
353                Some(Append {
354                    created_at: result_obtained_at,
355                    primary_event,
356                    execution_id,
357                    version: new_version,
358                    child_finished,
359                })
360            }
361            WorkerResult::DbUpdatedByWorker => None,
362            WorkerResult::Err(err) => {
363                let reason = err.to_string();
364                let (primary_event, child_finished, version) = match err {
365                    WorkerError::TemporaryTimeout => {
366                        info!("Temporary timeout");
367                        // Will be updated by `expired_timers_watcher`.
368                        return Ok(None);
369                    }
370                    WorkerError::DbError(db_error) => {
371                        return Err(db_error);
372                    }
373                    WorkerError::ActivityTrap {
374                        reason: reason_inner,
375                        trap_kind,
376                        detail,
377                        version,
378                    } => {
379                        if let Some(duration) = can_be_retried {
380                            let expires_at = result_obtained_at + duration;
381                            debug!("Retrying activity {trap_kind} execution after {duration:?} at {expires_at}");
382                            (
383                                ExecutionEventInner::TemporarilyFailed {
384                                    backoff_expires_at: expires_at,
385                                    reason_full: StrVariant::from(reason),
386                                    reason_inner: StrVariant::from(reason_inner),
387                                    detail: Some(detail),
388                                },
389                                None,
390                                version,
391                            )
392                        } else {
393                            info!(
394                                "Activity {trap_kind} marked as permanent failure - {reason_inner}"
395                            );
396                            let result = Err(FinishedExecutionError::PermanentFailure {
397                                reason_inner,
398                                reason_full: reason,
399                                kind: PermanentFailureKind::ActivityTrap,
400                                detail: Some(detail),
401                            });
402                            let child_finished =
403                                parent.map(|(parent_execution_id, parent_join_set)| {
404                                    ChildFinishedResponse {
405                                        parent_execution_id,
406                                        parent_join_set,
407                                        result: result.clone(),
408                                    }
409                                });
410                            (
411                                ExecutionEventInner::Finished { result },
412                                child_finished,
413                                version,
414                            )
415                        }
416                    }
417                    WorkerError::ActivityReturnedError { detail, version } => {
418                        let duration = can_be_retried.expect(
419                            "ActivityReturnedError must not be returned when retries are exhausted",
420                        );
421                        let expires_at = result_obtained_at + duration;
422                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
423                        (
424                            ExecutionEventInner::TemporarilyFailed {
425                                backoff_expires_at: expires_at,
426                                reason_full: StrVariant::Static("activity returned error"),
427                                reason_inner: StrVariant::Static("activity returned error"),
428                                detail,
429                            },
430                            None,
431                            version,
432                        )
433                    }
434                    WorkerError::TemporaryWorkflowTrap {
435                        reason: reason_inner,
436                        kind,
437                        detail,
438                        version,
439                    } => {
440                        let duration = can_be_retried.expect("workflows are retried forever");
441                        let expires_at = result_obtained_at + duration;
442                        debug!(
443                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
444                        );
445                        (
446                            ExecutionEventInner::TemporarilyFailed {
447                                backoff_expires_at: expires_at,
448                                reason_full: StrVariant::from(reason),
449                                reason_inner: StrVariant::from(reason_inner),
450                                detail,
451                            },
452                            None,
453                            version,
454                        )
455                    }
456                    WorkerError::LimitReached {
457                        reason: inner_reason,
458                        version: new_version,
459                    } => {
460                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
461                        warn!("Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}");
462                        (
463                            ExecutionEventInner::Unlocked {
464                                backoff_expires_at: expires_at,
465                                reason: StrVariant::from(reason),
466                            },
467                            None,
468                            new_version,
469                        )
470                    }
471                    WorkerError::FatalError(fatal_error, version) => {
472                        info!("Fatal worker error - {fatal_error:?}");
473                        let result = Err(FinishedExecutionError::from(fatal_error));
474                        let child_finished =
475                            parent.map(|(parent_execution_id, parent_join_set)| {
476                                ChildFinishedResponse {
477                                    parent_execution_id,
478                                    parent_join_set,
479                                    result: result.clone(),
480                                }
481                            });
482                        (
483                            ExecutionEventInner::Finished { result },
484                            child_finished,
485                            version,
486                        )
487                    }
488                };
489                Some(Append {
490                    created_at: result_obtained_at,
491                    primary_event: AppendRequest {
492                        created_at: result_obtained_at,
493                        event: primary_event,
494                    },
495                    execution_id,
496                    version,
497                    child_finished,
498                })
499            }
500        })
501    }
502}
503
504#[derive(Debug, Clone)]
505pub(crate) struct ChildFinishedResponse {
506    pub(crate) parent_execution_id: ExecutionId,
507    pub(crate) parent_join_set: JoinSetId,
508    pub(crate) result: FinishedExecutionResult,
509}
510
511#[derive(Debug, Clone)]
512pub(crate) struct Append {
513    pub(crate) created_at: DateTime<Utc>,
514    pub(crate) primary_event: AppendRequest,
515    pub(crate) execution_id: ExecutionId,
516    pub(crate) version: Version,
517    pub(crate) child_finished: Option<ChildFinishedResponse>,
518}
519
520impl Append {
521    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
522        if let Some(child_finished) = self.child_finished {
523            assert_matches!(
524                &self.primary_event,
525                AppendRequest {
526                    event: ExecutionEventInner::Finished { .. },
527                    ..
528                }
529            );
530            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
531            db_connection
532                .append_batch_respond_to_parent(
533                    derived.clone(),
534                    self.created_at,
535                    vec![self.primary_event],
536                    self.version.clone(),
537                    child_finished.parent_execution_id,
538                    JoinSetResponseEventOuter {
539                        created_at: self.created_at,
540                        event: JoinSetResponseEvent {
541                            join_set_id: child_finished.parent_join_set,
542                            event: JoinSetResponse::ChildExecutionFinished {
543                                child_execution_id: derived,
544                                // Since self.primary_event is a finished event, the version will remain the same.
545                                finished_version: self.version,
546                                result: child_finished.result,
547                            },
548                        },
549                    },
550                )
551                .await?;
552        } else {
553            db_connection
554                .append_batch(
555                    self.created_at,
556                    vec![self.primary_event],
557                    self.execution_id,
558                    self.version,
559                )
560                .await?;
561        }
562        Ok(())
563    }
564}
565
566#[cfg(any(test, feature = "test"))]
567pub mod simple_worker {
568    use crate::worker::{Worker, WorkerContext, WorkerResult};
569    use async_trait::async_trait;
570    use concepts::{
571        storage::{HistoryEvent, Version},
572        FunctionFqn, FunctionMetadata, ParameterTypes,
573    };
574    use indexmap::IndexMap;
575    use std::sync::Arc;
576    use tracing::trace;
577
578    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
579    pub type SimpleWorkerResultMap =
580        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
581
582    #[derive(Clone, Debug)]
583    pub struct SimpleWorker {
584        pub worker_results_rev: SimpleWorkerResultMap,
585        pub ffqn: FunctionFqn,
586        exported: [FunctionMetadata; 1],
587    }
588
589    impl SimpleWorker {
590        #[must_use]
591        pub fn with_single_result(res: WorkerResult) -> Self {
592            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
593                Version::new(2),
594                (vec![], res),
595            )]))))
596        }
597
598        #[must_use]
599        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
600            Self {
601                worker_results_rev: self.worker_results_rev,
602                exported: [FunctionMetadata {
603                    ffqn: ffqn.clone(),
604                    parameter_types: ParameterTypes::default(),
605                    return_type: None,
606                    extension: None,
607                    submittable: true,
608                }],
609                ffqn,
610            }
611        }
612
613        #[must_use]
614        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
615            Self {
616                worker_results_rev,
617                ffqn: FFQN_SOME,
618                exported: [FunctionMetadata {
619                    ffqn: FFQN_SOME,
620                    parameter_types: ParameterTypes::default(),
621                    return_type: None,
622                    extension: None,
623                    submittable: true,
624                }],
625            }
626        }
627    }
628
629    #[async_trait]
630    impl Worker for SimpleWorker {
631        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
632            let (expected_version, (expected_eh, worker_result)) =
633                self.worker_results_rev.lock().unwrap().pop().unwrap();
634            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
635            assert_eq!(expected_version, ctx.version);
636            assert_eq!(expected_eh, ctx.event_history);
637            worker_result
638        }
639
640        fn exported_functions(&self) -> &[FunctionMetadata] {
641            &self.exported
642        }
643
644        fn imported_functions(&self) -> &[FunctionMetadata] {
645            &[]
646        }
647    }
648}
649
650#[cfg(test)]
651mod tests {
652    use self::simple_worker::SimpleWorker;
653    use super::*;
654    use crate::worker::FatalError;
655    use crate::{expired_timers_watcher, worker::WorkerResult};
656    use assert_matches::assert_matches;
657    use async_trait::async_trait;
658    use concepts::storage::{CreateRequest, JoinSetRequest};
659    use concepts::storage::{
660        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
661    };
662    use concepts::{
663        FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
664        SupportedFunctionReturnValue, TrapKind,
665    };
666    use db_tests::Database;
667    use indexmap::IndexMap;
668    use simple_worker::FFQN_SOME;
669    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
670    use test_utils::set_up;
671    use test_utils::sim_clock::{ConstClock, SimClock};
672    use utils::time::Now;
673
674    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
675
676    async fn tick_fn<
677        W: Worker + Debug,
678        C: ClockFn + 'static,
679        DB: DbConnection + 'static,
680        P: DbPool<DB> + 'static,
681    >(
682        config: ExecConfig,
683        clock_fn: C,
684        db_pool: P,
685        worker: Arc<W>,
686        executed_at: DateTime<Utc>,
687    ) -> ExecutionProgress {
688        trace!("Ticking with {worker:?}");
689        let ffqns = super::extract_ffqns(worker.as_ref());
690        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
691        let mut execution_progress = executor.tick(executed_at).await.unwrap();
692        loop {
693            execution_progress
694                .executions
695                .retain(|(_, abort_handle)| !abort_handle.is_finished());
696            if execution_progress.executions.is_empty() {
697                return execution_progress;
698            }
699            tokio::time::sleep(Duration::from_millis(10)).await;
700        }
701    }
702
703    #[tokio::test]
704    async fn execute_simple_lifecycle_tick_based_mem() {
705        let created_at = Now.now();
706        let (_guard, db_pool) = Database::Memory.set_up().await;
707        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
708        db_pool.close().await.unwrap();
709    }
710
711    #[cfg(not(madsim))]
712    #[tokio::test]
713    async fn execute_simple_lifecycle_tick_based_sqlite() {
714        let created_at = Now.now();
715        let (_guard, db_pool) = Database::Sqlite.set_up().await;
716        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
717        db_pool.close().await.unwrap();
718    }
719
720    async fn execute_simple_lifecycle_tick_based<
721        DB: DbConnection + 'static,
722        P: DbPool<DB> + 'static,
723        C: ClockFn + 'static,
724    >(
725        pool: P,
726        clock_fn: C,
727    ) {
728        set_up();
729        let created_at = clock_fn.now();
730        let exec_config = ExecConfig {
731            batch_size: 1,
732            lock_expiry: Duration::from_secs(1),
733            tick_sleep: Duration::from_millis(100),
734            component_id: ComponentId::dummy_activity(),
735            task_limiter: None,
736        };
737
738        let execution_log = create_and_tick(
739            CreateAndTickConfig {
740                execution_id: ExecutionId::generate(),
741                created_at,
742                max_retries: 0,
743                executed_at: created_at,
744                retry_exp_backoff: Duration::ZERO,
745            },
746            clock_fn,
747            pool,
748            exec_config,
749            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
750                SupportedFunctionReturnValue::None,
751                Version::new(2),
752            ))),
753            tick_fn,
754        )
755        .await;
756        assert_matches!(
757            execution_log.events.get(2).unwrap(),
758            ExecutionEvent {
759                event: ExecutionEventInner::Finished {
760                    result: Ok(SupportedFunctionReturnValue::None),
761                },
762                created_at: _,
763            }
764        );
765    }
766
767    #[tokio::test]
768    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
769        set_up();
770        let created_at = Now.now();
771        let clock_fn = ConstClock(created_at);
772        let (_guard, db_pool) = Database::Memory.set_up().await;
773        let exec_config = ExecConfig {
774            batch_size: 1,
775            lock_expiry: Duration::from_secs(1),
776            tick_sleep: Duration::ZERO,
777            component_id: ComponentId::dummy_activity(),
778            task_limiter: None,
779        };
780
781        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
782            SupportedFunctionReturnValue::None,
783            Version::new(2),
784        )));
785        let exec_task = ExecTask::spawn_new(
786            worker.clone(),
787            exec_config.clone(),
788            clock_fn,
789            db_pool.clone(),
790            ExecutorId::generate(),
791        );
792
793        let execution_log = create_and_tick(
794            CreateAndTickConfig {
795                execution_id: ExecutionId::generate(),
796                created_at,
797                max_retries: 0,
798                executed_at: created_at,
799                retry_exp_backoff: Duration::ZERO,
800            },
801            clock_fn,
802            db_pool.clone(),
803            exec_config,
804            worker,
805            |_, _, _, _, _| async {
806                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
807                ExecutionProgress::default()
808            },
809        )
810        .await;
811        exec_task.close().await;
812        db_pool.close().await.unwrap();
813        assert_matches!(
814            execution_log.events.get(2).unwrap(),
815            ExecutionEvent {
816                event: ExecutionEventInner::Finished {
817                    result: Ok(SupportedFunctionReturnValue::None),
818                },
819                created_at: _,
820            }
821        );
822    }
823
824    struct CreateAndTickConfig {
825        execution_id: ExecutionId,
826        created_at: DateTime<Utc>,
827        max_retries: u32,
828        executed_at: DateTime<Utc>,
829        retry_exp_backoff: Duration,
830    }
831
832    async fn create_and_tick<
833        W: Worker,
834        C: ClockFn,
835        DB: DbConnection,
836        P: DbPool<DB>,
837        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
838        F: Future<Output = ExecutionProgress>,
839    >(
840        config: CreateAndTickConfig,
841        clock_fn: C,
842        db_pool: P,
843        exec_config: ExecConfig,
844        worker: Arc<W>,
845        mut tick: T,
846    ) -> ExecutionLog {
847        // Create an execution
848        let db_connection = db_pool.connection();
849        db_connection
850            .create(CreateRequest {
851                created_at: config.created_at,
852                execution_id: config.execution_id.clone(),
853                ffqn: FFQN_SOME,
854                params: Params::empty(),
855                parent: None,
856                metadata: concepts::ExecutionMetadata::empty(),
857                scheduled_at: config.created_at,
858                retry_exp_backoff: config.retry_exp_backoff,
859                max_retries: config.max_retries,
860                component_id: ComponentId::dummy_activity(),
861                scheduled_by: None,
862            })
863            .await
864            .unwrap();
865        // execute!
866        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
867        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
868        debug!("Execution history after tick: {execution_log:?}");
869        // check that DB contains Created and Locked events.
870        assert_matches!(
871            execution_log.events.first().unwrap(),
872            ExecutionEvent {
873                event: ExecutionEventInner::Created { .. },
874                created_at: actually_created_at,
875            }
876            if config.created_at == *actually_created_at
877        );
878        let locked_at = assert_matches!(
879            execution_log.events.get(1).unwrap(),
880            ExecutionEvent {
881                event: ExecutionEventInner::Locked { .. },
882                created_at: locked_at
883            } if config.created_at <= *locked_at
884            => *locked_at
885        );
886        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
887            event: _,
888            created_at: executed_at,
889        } if *executed_at >= locked_at);
890        execution_log
891    }
892
893    #[expect(clippy::too_many_lines)]
894    #[tokio::test]
895    async fn activity_trap_should_trigger_an_execution_retry() {
896        set_up();
897        let sim_clock = SimClock::default();
898        let (_guard, db_pool) = Database::Memory.set_up().await;
899        let exec_config = ExecConfig {
900            batch_size: 1,
901            lock_expiry: Duration::from_secs(1),
902            tick_sleep: Duration::ZERO,
903            component_id: ComponentId::dummy_activity(),
904            task_limiter: None,
905        };
906        let expected_reason = "error reason";
907        let expected_detail = "error detail";
908        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
909            WorkerError::ActivityTrap {
910                reason: expected_reason.to_string(),
911                trap_kind: concepts::TrapKind::Trap,
912                detail: expected_detail.to_string(),
913                version: Version::new(2),
914            },
915        )));
916        let retry_exp_backoff = Duration::from_millis(100);
917        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
918        let execution_log = create_and_tick(
919            CreateAndTickConfig {
920                execution_id: ExecutionId::generate(),
921                created_at: sim_clock.now(),
922                max_retries: 1,
923                executed_at: sim_clock.now(),
924                retry_exp_backoff,
925            },
926            sim_clock.clone(),
927            db_pool.clone(),
928            exec_config.clone(),
929            worker,
930            tick_fn,
931        )
932        .await;
933        assert_eq!(3, execution_log.events.len());
934        {
935            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
936                &execution_log.events.get(2).unwrap(),
937                ExecutionEvent {
938                    event: ExecutionEventInner::TemporarilyFailed {
939                        reason_inner,
940                        reason_full,
941                        detail,
942                        backoff_expires_at,
943                    },
944                    created_at: at,
945                }
946                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
947            );
948            assert_eq!(expected_reason, reason_inner.deref());
949            assert_eq!(
950                format!("activity trap: {expected_reason}"),
951                reason_full.deref()
952            );
953            assert_eq!(Some(expected_detail), detail.as_deref());
954            assert_eq!(at, sim_clock.now());
955            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
956        }
957        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
958            std::sync::Mutex::new(IndexMap::from([(
959                Version::new(4),
960                (
961                    vec![],
962                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4)),
963                ),
964            )])),
965        )));
966        // noop until `retry_exp_backoff` expires
967        assert!(tick_fn(
968            exec_config.clone(),
969            sim_clock.clone(),
970            db_pool.clone(),
971            worker.clone(),
972            sim_clock.now(),
973        )
974        .await
975        .executions
976        .is_empty());
977        // tick again to finish the execution
978        sim_clock.move_time_forward(retry_exp_backoff).await;
979        tick_fn(
980            exec_config,
981            sim_clock.clone(),
982            db_pool.clone(),
983            worker,
984            sim_clock.now(),
985        )
986        .await;
987        let execution_log = {
988            let db_connection = db_pool.connection();
989            db_connection
990                .get(&execution_log.execution_id)
991                .await
992                .unwrap()
993        };
994        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
995        assert_matches!(
996            execution_log.events.get(3).unwrap(),
997            ExecutionEvent {
998                event: ExecutionEventInner::Locked { .. },
999                created_at: at
1000            } if *at == sim_clock.now()
1001        );
1002        assert_matches!(
1003            execution_log.events.get(4).unwrap(),
1004            ExecutionEvent {
1005                event: ExecutionEventInner::Finished {
1006                    result: Ok(SupportedFunctionReturnValue::None),
1007                },
1008                created_at: finished_at,
1009            } if *finished_at == sim_clock.now()
1010        );
1011        db_pool.close().await.unwrap();
1012    }
1013
1014    #[tokio::test]
1015    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1016        set_up();
1017        let created_at = Now.now();
1018        let clock_fn = ConstClock(created_at);
1019        let (_guard, db_pool) = Database::Memory.set_up().await;
1020        let exec_config = ExecConfig {
1021            batch_size: 1,
1022            lock_expiry: Duration::from_secs(1),
1023            tick_sleep: Duration::ZERO,
1024            component_id: ComponentId::dummy_activity(),
1025            task_limiter: None,
1026        };
1027
1028        let expected_reason = "error reason";
1029        let expected_detail = "error detail";
1030        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1031            WorkerError::ActivityTrap {
1032                reason: expected_reason.to_string(),
1033                trap_kind: concepts::TrapKind::Trap,
1034                detail: expected_detail.to_string(),
1035                version: Version::new(2),
1036            },
1037        )));
1038        let execution_log = create_and_tick(
1039            CreateAndTickConfig {
1040                execution_id: ExecutionId::generate(),
1041                created_at,
1042                max_retries: 0,
1043                executed_at: created_at,
1044                retry_exp_backoff: Duration::ZERO,
1045            },
1046            clock_fn,
1047            db_pool.clone(),
1048            exec_config.clone(),
1049            worker,
1050            tick_fn,
1051        )
1052        .await;
1053        assert_eq!(3, execution_log.events.len());
1054        let (reason, kind, detail) = assert_matches!(
1055            &execution_log.events.get(2).unwrap(),
1056            ExecutionEvent {
1057                event: ExecutionEventInner::Finished{
1058                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_})
1059                },
1060                created_at: at,
1061            } if *at == created_at
1062            => (reason_inner, kind, detail)
1063        );
1064        assert_eq!(expected_reason, *reason);
1065        assert_eq!(Some(expected_detail), detail.as_deref());
1066        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1067
1068        db_pool.close().await.unwrap();
1069    }
1070
1071    #[tokio::test]
1072    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1073        let worker_error = WorkerError::ActivityTrap {
1074            reason: "error reason".to_string(),
1075            trap_kind: TrapKind::Trap,
1076            detail: "detail".to_string(),
1077            version: Version::new(2),
1078        };
1079        let expected_child_err = FinishedExecutionError::PermanentFailure {
1080            reason_full: "activity trap: error reason".to_string(),
1081            reason_inner: "error reason".to_string(),
1082            kind: PermanentFailureKind::ActivityTrap,
1083            detail: Some("detail".to_string()),
1084        };
1085        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1086            .await;
1087    }
1088
1089    #[tokio::test]
1090    async fn child_execution_permanently_failed_should_notify_parent_timeout() {
1091        let worker_error = WorkerError::TemporaryTimeout;
1092        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1093        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1094            .await;
1095    }
1096
1097    #[tokio::test]
1098    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1099        let parent_id = ExecutionId::from_parts(1, 1);
1100        let join_set_id_outer =
1101            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1102        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1103        let join_set_id_inner =
1104            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1105        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1106        let worker_error = WorkerError::FatalError(
1107            FatalError::UnhandledChildExecutionError {
1108                child_execution_id: child_execution_id.clone(),
1109                root_cause_id: root_cause_id.clone(),
1110            },
1111            Version::new(2),
1112        );
1113        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1114            child_execution_id,
1115            root_cause_id,
1116        };
1117        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1118            .await;
1119    }
1120
1121    #[expect(clippy::too_many_lines)]
1122    async fn child_execution_permanently_failed_should_notify_parent(
1123        worker_error: WorkerError,
1124        expected_child_err: FinishedExecutionError,
1125    ) {
1126        use concepts::storage::JoinSetResponseEventOuter;
1127        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1128
1129        set_up();
1130        let sim_clock = SimClock::default();
1131        let (_guard, db_pool) = Database::Memory.set_up().await;
1132
1133        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1134            WorkerResult::DbUpdatedByWorker,
1135        ));
1136        let parent_execution_id = ExecutionId::generate();
1137        db_pool
1138            .connection()
1139            .create(CreateRequest {
1140                created_at: sim_clock.now(),
1141                execution_id: parent_execution_id.clone(),
1142                ffqn: FFQN_SOME,
1143                params: Params::empty(),
1144                parent: None,
1145                metadata: concepts::ExecutionMetadata::empty(),
1146                scheduled_at: sim_clock.now(),
1147                retry_exp_backoff: Duration::ZERO,
1148                max_retries: 0,
1149                component_id: ComponentId::dummy_activity(),
1150                scheduled_by: None,
1151            })
1152            .await
1153            .unwrap();
1154        tick_fn(
1155            ExecConfig {
1156                batch_size: 1,
1157                lock_expiry: LOCK_EXPIRY,
1158                tick_sleep: Duration::ZERO,
1159                component_id: ComponentId::dummy_activity(),
1160                task_limiter: None,
1161            },
1162            sim_clock.clone(),
1163            db_pool.clone(),
1164            parent_worker,
1165            sim_clock.now(),
1166        )
1167        .await;
1168
1169        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1170        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1171        // executor does not append anything, this should have been written by the worker:
1172        {
1173            let child = CreateRequest {
1174                created_at: sim_clock.now(),
1175                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1176                ffqn: FFQN_CHILD,
1177                params: Params::empty(),
1178                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1179                metadata: concepts::ExecutionMetadata::empty(),
1180                scheduled_at: sim_clock.now(),
1181                retry_exp_backoff: Duration::ZERO,
1182                max_retries: 0,
1183                component_id: ComponentId::dummy_activity(),
1184                scheduled_by: None,
1185            };
1186            let current_time = sim_clock.now();
1187            let join_set = AppendRequest {
1188                created_at: current_time,
1189                event: ExecutionEventInner::HistoryEvent {
1190                    event: HistoryEvent::JoinSet {
1191                        join_set_id: join_set_id.clone(),
1192                    },
1193                },
1194            };
1195            let child_exec_req = AppendRequest {
1196                created_at: current_time,
1197                event: ExecutionEventInner::HistoryEvent {
1198                    event: HistoryEvent::JoinSetRequest {
1199                        join_set_id: join_set_id.clone(),
1200                        request: JoinSetRequest::ChildExecutionRequest {
1201                            child_execution_id: child_execution_id.clone(),
1202                        },
1203                    },
1204                },
1205            };
1206            let join_next = AppendRequest {
1207                created_at: current_time,
1208                event: ExecutionEventInner::HistoryEvent {
1209                    event: HistoryEvent::JoinNext {
1210                        join_set_id: join_set_id.clone(),
1211                        run_expires_at: sim_clock.now(),
1212                        closing: false,
1213                    },
1214                },
1215            };
1216            db_pool
1217                .connection()
1218                .append_batch_create_new_execution(
1219                    current_time,
1220                    vec![join_set, child_exec_req, join_next],
1221                    parent_execution_id.clone(),
1222                    Version::new(2),
1223                    vec![child],
1224                )
1225                .await
1226                .unwrap();
1227        }
1228
1229        let child_worker = Arc::new(
1230            SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1231        );
1232
1233        // execute the child
1234        tick_fn(
1235            ExecConfig {
1236                batch_size: 1,
1237                lock_expiry: LOCK_EXPIRY,
1238                tick_sleep: Duration::ZERO,
1239                component_id: ComponentId::dummy_activity(),
1240                task_limiter: None,
1241            },
1242            sim_clock.clone(),
1243            db_pool.clone(),
1244            child_worker,
1245            sim_clock.now(),
1246        )
1247        .await;
1248        if expected_child_err == FinishedExecutionError::PermanentTimeout {
1249            // In case of timeout, let the timers watcher handle it
1250            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1251            expired_timers_watcher::tick(db_pool.connection(), sim_clock.now())
1252                .await
1253                .unwrap();
1254        }
1255        let child_log = db_pool
1256            .connection()
1257            .get(&ExecutionId::Derived(child_execution_id.clone()))
1258            .await
1259            .unwrap();
1260        assert!(child_log.pending_state.is_finished());
1261        assert_eq!(
1262            Version(2),
1263            child_log.next_version,
1264            "created = 0, locked = 1, with_single_result = 2"
1265        );
1266        assert_eq!(
1267            ExecutionEventInner::Finished {
1268                result: Err(expected_child_err)
1269            },
1270            child_log.last_event().event
1271        );
1272        let parent_log = db_pool
1273            .connection()
1274            .get(&parent_execution_id)
1275            .await
1276            .unwrap();
1277        assert_matches!(
1278            parent_log.pending_state,
1279            PendingState::PendingAt {
1280                scheduled_at
1281            } if scheduled_at == sim_clock.now(),
1282            "parent should be back to pending"
1283        );
1284        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1285            parent_log.responses.last(),
1286            Some(JoinSetResponseEventOuter{
1287                created_at: at,
1288                event: JoinSetResponseEvent{
1289                    join_set_id: found_join_set_id,
1290                    event: JoinSetResponse::ChildExecutionFinished {
1291                        child_execution_id: found_child_execution_id,
1292                        finished_version,
1293                        result: found_result,
1294                    }
1295                }
1296            })
1297             if *at == sim_clock.now()
1298            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1299        );
1300        assert_eq!(join_set_id, *found_join_set_id);
1301        assert_eq!(child_execution_id, *found_child_execution_id);
1302        assert_eq!(child_log.next_version, *child_finished_version);
1303        assert!(found_result.is_err());
1304
1305        db_pool.close().await.unwrap();
1306    }
1307
1308    #[derive(Clone, Debug)]
1309    struct SleepyWorker {
1310        duration: Duration,
1311        result: SupportedFunctionReturnValue,
1312        exported: [FunctionMetadata; 1],
1313    }
1314
1315    #[async_trait]
1316    impl Worker for SleepyWorker {
1317        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1318            tokio::time::sleep(self.duration).await;
1319            WorkerResult::Ok(self.result.clone(), ctx.version)
1320        }
1321
1322        fn exported_functions(&self) -> &[FunctionMetadata] {
1323            &self.exported
1324        }
1325
1326        fn imported_functions(&self) -> &[FunctionMetadata] {
1327            &[]
1328        }
1329    }
1330
1331    #[expect(clippy::too_many_lines)]
1332    #[tokio::test]
1333    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1334        set_up();
1335        let sim_clock = SimClock::default();
1336        let (_guard, db_pool) = Database::Memory.set_up().await;
1337        let lock_expiry = Duration::from_millis(100);
1338        let exec_config = ExecConfig {
1339            batch_size: 1,
1340            lock_expiry,
1341            tick_sleep: Duration::ZERO,
1342            component_id: ComponentId::dummy_activity(),
1343            task_limiter: None,
1344        };
1345
1346        let worker = Arc::new(SleepyWorker {
1347            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1348            result: SupportedFunctionReturnValue::None,
1349            exported: [FunctionMetadata {
1350                ffqn: FFQN_SOME,
1351                parameter_types: ParameterTypes::default(),
1352                return_type: None,
1353                extension: None,
1354                submittable: true,
1355            }],
1356        });
1357        // Create an execution
1358        let execution_id = ExecutionId::generate();
1359        let timeout_duration = Duration::from_millis(300);
1360        let db_connection = db_pool.connection();
1361        db_connection
1362            .create(CreateRequest {
1363                created_at: sim_clock.now(),
1364                execution_id: execution_id.clone(),
1365                ffqn: FFQN_SOME,
1366                params: Params::empty(),
1367                parent: None,
1368                metadata: concepts::ExecutionMetadata::empty(),
1369                scheduled_at: sim_clock.now(),
1370                retry_exp_backoff: timeout_duration,
1371                max_retries: 1,
1372                component_id: ComponentId::dummy_activity(),
1373                scheduled_by: None,
1374            })
1375            .await
1376            .unwrap();
1377
1378        let ffqns = super::extract_ffqns(worker.as_ref());
1379        let executor = ExecTask::new(
1380            worker,
1381            exec_config,
1382            sim_clock.clone(),
1383            db_pool.clone(),
1384            ffqns,
1385        );
1386        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1387        assert_eq!(1, first_execution_progress.executions.len());
1388        // Started hanging, wait for lock expiry.
1389        sim_clock.move_time_forward(lock_expiry).await;
1390        // cleanup should be called
1391        let now_after_first_lock_expiry = sim_clock.now();
1392        {
1393            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1394            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1395            assert!(cleanup_progress.executions.is_empty());
1396        }
1397        {
1398            let expired_locks =
1399                expired_timers_watcher::tick(db_pool.connection(), now_after_first_lock_expiry)
1400                    .await
1401                    .unwrap()
1402                    .expired_locks;
1403            assert_eq!(1, expired_locks);
1404        }
1405        assert!(!first_execution_progress
1406            .executions
1407            .pop()
1408            .unwrap()
1409            .1
1410            .is_finished());
1411
1412        let execution_log = db_connection.get(&execution_id).await.unwrap();
1413        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1414        assert_matches!(
1415            &execution_log.events.get(2).unwrap(),
1416            ExecutionEvent {
1417                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
1418                created_at: at,
1419            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1420        );
1421        assert_eq!(
1422            PendingState::PendingAt {
1423                scheduled_at: expected_first_timeout_expiry
1424            },
1425            execution_log.pending_state
1426        );
1427        sim_clock.move_time_forward(timeout_duration).await;
1428        let now_after_first_timeout = sim_clock.now();
1429        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1430
1431        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1432        assert_eq!(1, second_execution_progress.executions.len());
1433
1434        // Started hanging, wait for lock expiry.
1435        sim_clock.move_time_forward(lock_expiry).await;
1436        // cleanup should be called
1437        let now_after_second_lock_expiry = sim_clock.now();
1438        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1439        {
1440            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1441            assert!(cleanup_progress.executions.is_empty());
1442        }
1443        {
1444            let expired_locks =
1445                expired_timers_watcher::tick(db_pool.connection(), now_after_second_lock_expiry)
1446                    .await
1447                    .unwrap()
1448                    .expired_locks;
1449            assert_eq!(1, expired_locks);
1450        }
1451        assert!(!second_execution_progress
1452            .executions
1453            .pop()
1454            .unwrap()
1455            .1
1456            .is_finished());
1457
1458        drop(db_connection);
1459        drop(executor);
1460        db_pool.close().await.unwrap();
1461    }
1462}