obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
10use concepts::{
11    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
12    FinishedExecutionError,
13};
14use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18    sync::{
19        atomic::{AtomicBool, Ordering},
20        Arc,
21    },
22    time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29    pub lock_expiry: Duration,
30    pub tick_sleep: Duration,
31    pub batch_size: u32,
32    pub component_id: ComponentId,
33    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37    worker: Arc<dyn Worker>,
38    config: ExecConfig,
39    clock_fn: C, // Used for obtaining current time when the execution finishes.
40    db_pool: P,
41    executor_id: ExecutorId,
42    phantom_data: PhantomData<DB>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: P,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            executor_id: ExecutorId::generate(),
123            db_pool,
124            phantom_data: PhantomData,
125            ffqns,
126            clock_fn,
127        }
128    }
129
130    pub fn spawn_new(
131        worker: Arc<dyn Worker>,
132        config: ExecConfig,
133        clock_fn: C,
134        db_pool: P,
135        executor_id: ExecutorId,
136    ) -> ExecutorTaskHandle {
137        let is_closing = Arc::new(AtomicBool::default());
138        let is_closing_inner = is_closing.clone();
139        let ffqns = extract_ffqns(worker.as_ref());
140        let component_id = config.component_id.clone();
141        let abort_handle = tokio::spawn(async move {
142            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143            let task = Self {
144                worker,
145                config,
146                executor_id,
147                db_pool,
148                phantom_data: PhantomData,
149                ffqns: ffqns.clone(),
150                clock_fn: clock_fn.clone(),
151            };
152            loop {
153                let _ = task.tick(clock_fn.now()).await;
154                let executed_at = clock_fn.now();
155                task.db_pool
156                    .connection()
157                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158                    .await;
159                if is_closing_inner.load(Ordering::Relaxed) {
160                    return;
161                }
162            }
163        })
164        .abort_handle();
165        ExecutorTaskHandle {
166            is_closing,
167            abort_handle,
168            component_id,
169            executor_id,
170        }
171    }
172
173    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174        if let Some(task_limiter) = &self.config.task_limiter {
175            let mut locks = Vec::new();
176            for _ in 0..self.config.batch_size {
177                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178                    locks.push(Some(permit));
179                } else {
180                    break;
181                }
182            }
183            locks
184        } else {
185            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186            for _ in 0..self.config.batch_size {
187                vec.push(None);
188            }
189            vec
190        }
191    }
192
193    #[cfg(feature = "test")]
194    pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200        let locked_executions = {
201            let mut permits = self.acquire_task_permits();
202            if permits.is_empty() {
203                return Ok(ExecutionProgress::default());
204            }
205            let db_connection = self.db_pool.connection();
206            let lock_expires_at = executed_at + self.config.lock_expiry;
207            let locked_executions = db_connection
208                .lock_pending(
209                    permits.len(), // batch size
210                    executed_at,   // fetch expiring before now
211                    self.ffqns.clone(),
212                    executed_at, // created at
213                    self.config.component_id.clone(),
214                    self.executor_id,
215                    lock_expires_at,
216                )
217                .await
218                .map_err(|err| {
219                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220                })?;
221            // Drop permits if too many were allocated.
222            while permits.len() > locked_executions.len() {
223                permits.pop();
224            }
225            assert_eq!(permits.len(), locked_executions.len());
226            locked_executions.into_iter().zip(permits)
227        };
228        let execution_deadline = executed_at + self.config.lock_expiry;
229
230        let mut executions = Vec::with_capacity(locked_executions.len());
231        for (locked_execution, permit) in locked_executions {
232            let execution_id = locked_execution.execution_id.clone();
233            let join_handle = {
234                let worker = self.worker.clone();
235                let db_pool = self.db_pool.clone();
236                let clock_fn = self.clock_fn.clone();
237                let run_id = locked_execution.run_id;
238                let worker_span = info_span!(parent: None, "worker",
239                    "otel.name" = format!("{}", locked_execution.ffqn),
240                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241                locked_execution.metadata.enrich(&worker_span);
242                tokio::spawn({
243                    let worker_span2 = worker_span.clone();
244                    async move {
245                        let res = Self::run_worker(
246                            worker,
247                            &db_pool,
248                            execution_deadline,
249                            clock_fn,
250                            locked_execution,
251                            worker_span2,
252                        )
253                        .await;
254                        if let Err(db_error) = res {
255                            error!("Execution will be timed out not writing `{db_error:?}`");
256                        }
257                        drop(permit);
258                    }
259                    .instrument(worker_span)
260                })
261            };
262            executions.push((execution_id, join_handle));
263        }
264        Ok(ExecutionProgress { executions })
265    }
266
267    async fn run_worker(
268        worker: Arc<dyn Worker>,
269        db_pool: &P,
270        execution_deadline: DateTime<Utc>,
271        clock_fn: C,
272        locked_execution: LockedExecution,
273        worker_span: Span,
274    ) -> Result<(), DbError> {
275        debug!("Worker::run starting");
276        trace!(
277            version = %locked_execution.version,
278            params = ?locked_execution.params,
279            event_history = ?locked_execution.event_history,
280            "Worker::run starting"
281        );
282        let can_be_retried = ExecutionLog::can_be_retried_after(
283            locked_execution.temporary_event_count + 1,
284            locked_execution.max_retries,
285            locked_execution.retry_exp_backoff,
286        );
287        let unlock_expiry_on_limit_reached =
288            ExecutionLog::compute_retry_duration_when_retrying_forever(
289                locked_execution.temporary_event_count + 1,
290                locked_execution.retry_exp_backoff,
291            );
292        let ctx = WorkerContext {
293            execution_id: locked_execution.execution_id.clone(),
294            metadata: locked_execution.metadata,
295            ffqn: locked_execution.ffqn,
296            params: locked_execution.params,
297            event_history: locked_execution.event_history,
298            responses: locked_execution
299                .responses
300                .into_iter()
301                .map(|outer| outer.event)
302                .collect(),
303            version: locked_execution.version,
304            execution_deadline,
305            can_be_retried: can_be_retried.is_some(),
306            run_id: locked_execution.run_id,
307            worker_span,
308        };
309        let worker_result = worker.run(ctx).await;
310        trace!(?worker_result, "Worker::run finished");
311        let result_obtained_at = clock_fn.now();
312        match Self::worker_result_to_execution_event(
313            locked_execution.execution_id,
314            worker_result,
315            result_obtained_at,
316            locked_execution.parent,
317            can_be_retried,
318            unlock_expiry_on_limit_reached,
319        )? {
320            Some(append) => {
321                let db_connection = db_pool.connection();
322                trace!("Appending {append:?}");
323                append.clone().append(&db_connection).await
324            }
325            None => Ok(()),
326        }
327    }
328
329    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
330    /// Map the `WorkerError` to an temporary or a permanent failure.
331    #[expect(clippy::too_many_lines)]
332    fn worker_result_to_execution_event(
333        execution_id: ExecutionId,
334        worker_result: WorkerResult,
335        result_obtained_at: DateTime<Utc>,
336        parent: Option<(ExecutionId, JoinSetId)>,
337        can_be_retried: Option<Duration>,
338        unlock_expiry_on_limit_reached: Duration,
339    ) -> Result<Option<Append>, DbError> {
340        Ok(match worker_result {
341            WorkerResult::Ok(result, new_version) => {
342                info!(
343                    "Execution finished: {}",
344                    result.as_pending_state_finished_result()
345                );
346                let child_finished =
347                    parent.map(
348                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349                            parent_execution_id,
350                            parent_join_set,
351                            result: Ok(result.clone()),
352                        },
353                    );
354                let primary_event = AppendRequest {
355                    created_at: result_obtained_at,
356                    event: ExecutionEventInner::Finished { result: Ok(result) },
357                };
358
359                Some(Append {
360                    created_at: result_obtained_at,
361                    primary_event,
362                    execution_id,
363                    version: new_version,
364                    child_finished,
365                })
366            }
367            WorkerResult::DbUpdatedByWorker => None,
368            WorkerResult::Err(err) => {
369                let reason = err.to_string();
370                let (primary_event, child_finished, version) = match err {
371                    WorkerError::TemporaryTimeout => {
372                        info!("Temporary timeout");
373                        // Will be updated by `expired_timers_watcher`.
374                        return Ok(None);
375                    }
376                    WorkerError::DbError(db_error) => {
377                        return Err(db_error);
378                    }
379                    WorkerError::ActivityTrap {
380                        reason: reason_inner,
381                        trap_kind,
382                        detail,
383                        version,
384                    } => {
385                        if let Some(duration) = can_be_retried {
386                            let expires_at = result_obtained_at + duration;
387                            debug!(
388                                "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
389                            );
390                            (
391                                ExecutionEventInner::TemporarilyFailed {
392                                    backoff_expires_at: expires_at,
393                                    reason_full: StrVariant::from(reason),
394                                    reason_inner: StrVariant::from(reason_inner),
395                                    detail: Some(detail),
396                                },
397                                None,
398                                version,
399                            )
400                        } else {
401                            info!(
402                                "Activity {trap_kind} marked as permanent failure - {reason_inner}"
403                            );
404                            let result = Err(FinishedExecutionError::PermanentFailure {
405                                reason_inner,
406                                reason_full: reason,
407                                kind: PermanentFailureKind::ActivityTrap,
408                                detail: Some(detail),
409                            });
410                            let child_finished =
411                                parent.map(|(parent_execution_id, parent_join_set)| {
412                                    ChildFinishedResponse {
413                                        parent_execution_id,
414                                        parent_join_set,
415                                        result: result.clone(),
416                                    }
417                                });
418                            (
419                                ExecutionEventInner::Finished { result },
420                                child_finished,
421                                version,
422                            )
423                        }
424                    }
425                    WorkerError::ActivityReturnedError { detail, version } => {
426                        let duration = can_be_retried.expect(
427                            "ActivityReturnedError must not be returned when retries are exhausted",
428                        );
429                        let expires_at = result_obtained_at + duration;
430                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
431                        (
432                            ExecutionEventInner::TemporarilyFailed {
433                                backoff_expires_at: expires_at,
434                                reason_full: StrVariant::Static("activity returned error"),
435                                reason_inner: StrVariant::Static("activity returned error"),
436                                detail,
437                            },
438                            None,
439                            version,
440                        )
441                    }
442                    WorkerError::TemporaryWorkflowTrap {
443                        reason: reason_inner,
444                        kind,
445                        detail,
446                        version,
447                    } => {
448                        let duration = can_be_retried.expect("workflows are retried forever");
449                        let expires_at = result_obtained_at + duration;
450                        debug!(
451                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
452                        );
453                        (
454                            ExecutionEventInner::TemporarilyFailed {
455                                backoff_expires_at: expires_at,
456                                reason_full: StrVariant::from(reason),
457                                reason_inner: StrVariant::from(reason_inner),
458                                detail,
459                            },
460                            None,
461                            version,
462                        )
463                    }
464                    WorkerError::LimitReached {
465                        reason: inner_reason,
466                        version: new_version,
467                    } => {
468                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
469                        warn!(
470                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
471                        );
472                        (
473                            ExecutionEventInner::Unlocked {
474                                backoff_expires_at: expires_at,
475                                reason: StrVariant::from(reason),
476                            },
477                            None,
478                            new_version,
479                        )
480                    }
481                    WorkerError::FatalError(fatal_error, version) => {
482                        info!("Fatal worker error - {fatal_error:?}");
483                        let result = Err(FinishedExecutionError::from(fatal_error));
484                        let child_finished =
485                            parent.map(|(parent_execution_id, parent_join_set)| {
486                                ChildFinishedResponse {
487                                    parent_execution_id,
488                                    parent_join_set,
489                                    result: result.clone(),
490                                }
491                            });
492                        (
493                            ExecutionEventInner::Finished { result },
494                            child_finished,
495                            version,
496                        )
497                    }
498                };
499                Some(Append {
500                    created_at: result_obtained_at,
501                    primary_event: AppendRequest {
502                        created_at: result_obtained_at,
503                        event: primary_event,
504                    },
505                    execution_id,
506                    version,
507                    child_finished,
508                })
509            }
510        })
511    }
512}
513
514#[derive(Debug, Clone)]
515pub(crate) struct ChildFinishedResponse {
516    pub(crate) parent_execution_id: ExecutionId,
517    pub(crate) parent_join_set: JoinSetId,
518    pub(crate) result: FinishedExecutionResult,
519}
520
521#[derive(Debug, Clone)]
522pub(crate) struct Append {
523    pub(crate) created_at: DateTime<Utc>,
524    pub(crate) primary_event: AppendRequest,
525    pub(crate) execution_id: ExecutionId,
526    pub(crate) version: Version,
527    pub(crate) child_finished: Option<ChildFinishedResponse>,
528}
529
530impl Append {
531    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
532        if let Some(child_finished) = self.child_finished {
533            assert_matches!(
534                &self.primary_event,
535                AppendRequest {
536                    event: ExecutionEventInner::Finished { .. },
537                    ..
538                }
539            );
540            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
541            db_connection
542                .append_batch_respond_to_parent(
543                    derived.clone(),
544                    self.created_at,
545                    vec![self.primary_event],
546                    self.version.clone(),
547                    child_finished.parent_execution_id,
548                    JoinSetResponseEventOuter {
549                        created_at: self.created_at,
550                        event: JoinSetResponseEvent {
551                            join_set_id: child_finished.parent_join_set,
552                            event: JoinSetResponse::ChildExecutionFinished {
553                                child_execution_id: derived,
554                                // Since self.primary_event is a finished event, the version will remain the same.
555                                finished_version: self.version,
556                                result: child_finished.result,
557                            },
558                        },
559                    },
560                )
561                .await?;
562        } else {
563            db_connection
564                .append_batch(
565                    self.created_at,
566                    vec![self.primary_event],
567                    self.execution_id,
568                    self.version,
569                )
570                .await?;
571        }
572        Ok(())
573    }
574}
575
576#[cfg(any(test, feature = "test"))]
577pub mod simple_worker {
578    use crate::worker::{Worker, WorkerContext, WorkerResult};
579    use async_trait::async_trait;
580    use concepts::{
581        storage::{HistoryEvent, Version},
582        FunctionFqn, FunctionMetadata, ParameterTypes,
583    };
584    use indexmap::IndexMap;
585    use std::sync::Arc;
586    use tracing::trace;
587
588    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
589    pub type SimpleWorkerResultMap =
590        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
591
592    #[derive(Clone, Debug)]
593    pub struct SimpleWorker {
594        pub worker_results_rev: SimpleWorkerResultMap,
595        pub ffqn: FunctionFqn,
596        exported: [FunctionMetadata; 1],
597    }
598
599    impl SimpleWorker {
600        #[must_use]
601        pub fn with_single_result(res: WorkerResult) -> Self {
602            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
603                Version::new(2),
604                (vec![], res),
605            )]))))
606        }
607
608        #[must_use]
609        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
610            Self {
611                worker_results_rev: self.worker_results_rev,
612                exported: [FunctionMetadata {
613                    ffqn: ffqn.clone(),
614                    parameter_types: ParameterTypes::default(),
615                    return_type: None,
616                    extension: None,
617                    submittable: true,
618                }],
619                ffqn,
620            }
621        }
622
623        #[must_use]
624        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
625            Self {
626                worker_results_rev,
627                ffqn: FFQN_SOME,
628                exported: [FunctionMetadata {
629                    ffqn: FFQN_SOME,
630                    parameter_types: ParameterTypes::default(),
631                    return_type: None,
632                    extension: None,
633                    submittable: true,
634                }],
635            }
636        }
637    }
638
639    #[async_trait]
640    impl Worker for SimpleWorker {
641        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
642            let (expected_version, (expected_eh, worker_result)) =
643                self.worker_results_rev.lock().unwrap().pop().unwrap();
644            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
645            assert_eq!(expected_version, ctx.version);
646            assert_eq!(expected_eh, ctx.event_history);
647            worker_result
648        }
649
650        fn exported_functions(&self) -> &[FunctionMetadata] {
651            &self.exported
652        }
653
654        fn imported_functions(&self) -> &[FunctionMetadata] {
655            &[]
656        }
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use self::simple_worker::SimpleWorker;
663    use super::*;
664    use crate::worker::FatalError;
665    use crate::{expired_timers_watcher, worker::WorkerResult};
666    use assert_matches::assert_matches;
667    use async_trait::async_trait;
668    use concepts::storage::{CreateRequest, JoinSetRequest};
669    use concepts::storage::{
670        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
671    };
672    use concepts::time::Now;
673    use concepts::{
674        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
675        SupportedFunctionReturnValue, TrapKind,
676    };
677    use db_tests::Database;
678    use indexmap::IndexMap;
679    use simple_worker::FFQN_SOME;
680    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
681    use test_utils::set_up;
682    use test_utils::sim_clock::{ConstClock, SimClock};
683
684    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
685
686    async fn tick_fn<
687        W: Worker + Debug,
688        C: ClockFn + 'static,
689        DB: DbConnection + 'static,
690        P: DbPool<DB> + 'static,
691    >(
692        config: ExecConfig,
693        clock_fn: C,
694        db_pool: P,
695        worker: Arc<W>,
696        executed_at: DateTime<Utc>,
697    ) -> ExecutionProgress {
698        trace!("Ticking with {worker:?}");
699        let ffqns = super::extract_ffqns(worker.as_ref());
700        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
701        let mut execution_progress = executor.tick(executed_at).await.unwrap();
702        loop {
703            execution_progress
704                .executions
705                .retain(|(_, abort_handle)| !abort_handle.is_finished());
706            if execution_progress.executions.is_empty() {
707                return execution_progress;
708            }
709            tokio::time::sleep(Duration::from_millis(10)).await;
710        }
711    }
712
713    #[tokio::test]
714    async fn execute_simple_lifecycle_tick_based_mem() {
715        let created_at = Now.now();
716        let (_guard, db_pool) = Database::Memory.set_up().await;
717        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
718        db_pool.close().await.unwrap();
719    }
720
721    #[cfg(not(madsim))]
722    #[tokio::test]
723    async fn execute_simple_lifecycle_tick_based_sqlite() {
724        let created_at = Now.now();
725        let (_guard, db_pool) = Database::Sqlite.set_up().await;
726        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
727        db_pool.close().await.unwrap();
728    }
729
730    async fn execute_simple_lifecycle_tick_based<
731        DB: DbConnection + 'static,
732        P: DbPool<DB> + 'static,
733        C: ClockFn + 'static,
734    >(
735        pool: P,
736        clock_fn: C,
737    ) {
738        set_up();
739        let created_at = clock_fn.now();
740        let exec_config = ExecConfig {
741            batch_size: 1,
742            lock_expiry: Duration::from_secs(1),
743            tick_sleep: Duration::from_millis(100),
744            component_id: ComponentId::dummy_activity(),
745            task_limiter: None,
746        };
747
748        let execution_log = create_and_tick(
749            CreateAndTickConfig {
750                execution_id: ExecutionId::generate(),
751                created_at,
752                max_retries: 0,
753                executed_at: created_at,
754                retry_exp_backoff: Duration::ZERO,
755            },
756            clock_fn,
757            pool,
758            exec_config,
759            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
760                SupportedFunctionReturnValue::None,
761                Version::new(2),
762            ))),
763            tick_fn,
764        )
765        .await;
766        assert_matches!(
767            execution_log.events.get(2).unwrap(),
768            ExecutionEvent {
769                event: ExecutionEventInner::Finished {
770                    result: Ok(SupportedFunctionReturnValue::None),
771                },
772                created_at: _,
773            }
774        );
775    }
776
777    #[tokio::test]
778    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
779        set_up();
780        let created_at = Now.now();
781        let clock_fn = ConstClock(created_at);
782        let (_guard, db_pool) = Database::Memory.set_up().await;
783        let exec_config = ExecConfig {
784            batch_size: 1,
785            lock_expiry: Duration::from_secs(1),
786            tick_sleep: Duration::ZERO,
787            component_id: ComponentId::dummy_activity(),
788            task_limiter: None,
789        };
790
791        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
792            SupportedFunctionReturnValue::None,
793            Version::new(2),
794        )));
795        let exec_task = ExecTask::spawn_new(
796            worker.clone(),
797            exec_config.clone(),
798            clock_fn,
799            db_pool.clone(),
800            ExecutorId::generate(),
801        );
802
803        let execution_log = create_and_tick(
804            CreateAndTickConfig {
805                execution_id: ExecutionId::generate(),
806                created_at,
807                max_retries: 0,
808                executed_at: created_at,
809                retry_exp_backoff: Duration::ZERO,
810            },
811            clock_fn,
812            db_pool.clone(),
813            exec_config,
814            worker,
815            |_, _, _, _, _| async {
816                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
817                ExecutionProgress::default()
818            },
819        )
820        .await;
821        exec_task.close().await;
822        db_pool.close().await.unwrap();
823        assert_matches!(
824            execution_log.events.get(2).unwrap(),
825            ExecutionEvent {
826                event: ExecutionEventInner::Finished {
827                    result: Ok(SupportedFunctionReturnValue::None),
828                },
829                created_at: _,
830            }
831        );
832    }
833
834    struct CreateAndTickConfig {
835        execution_id: ExecutionId,
836        created_at: DateTime<Utc>,
837        max_retries: u32,
838        executed_at: DateTime<Utc>,
839        retry_exp_backoff: Duration,
840    }
841
842    async fn create_and_tick<
843        W: Worker,
844        C: ClockFn,
845        DB: DbConnection,
846        P: DbPool<DB>,
847        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
848        F: Future<Output = ExecutionProgress>,
849    >(
850        config: CreateAndTickConfig,
851        clock_fn: C,
852        db_pool: P,
853        exec_config: ExecConfig,
854        worker: Arc<W>,
855        mut tick: T,
856    ) -> ExecutionLog {
857        // Create an execution
858        let db_connection = db_pool.connection();
859        db_connection
860            .create(CreateRequest {
861                created_at: config.created_at,
862                execution_id: config.execution_id.clone(),
863                ffqn: FFQN_SOME,
864                params: Params::empty(),
865                parent: None,
866                metadata: concepts::ExecutionMetadata::empty(),
867                scheduled_at: config.created_at,
868                retry_exp_backoff: config.retry_exp_backoff,
869                max_retries: config.max_retries,
870                component_id: ComponentId::dummy_activity(),
871                scheduled_by: None,
872            })
873            .await
874            .unwrap();
875        // execute!
876        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
877        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
878        debug!("Execution history after tick: {execution_log:?}");
879        // check that DB contains Created and Locked events.
880        assert_matches!(
881            execution_log.events.first().unwrap(),
882            ExecutionEvent {
883                event: ExecutionEventInner::Created { .. },
884                created_at: actually_created_at,
885            }
886            if config.created_at == *actually_created_at
887        );
888        let locked_at = assert_matches!(
889            execution_log.events.get(1).unwrap(),
890            ExecutionEvent {
891                event: ExecutionEventInner::Locked { .. },
892                created_at: locked_at
893            } if config.created_at <= *locked_at
894            => *locked_at
895        );
896        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
897            event: _,
898            created_at: executed_at,
899        } if *executed_at >= locked_at);
900        execution_log
901    }
902
903    #[expect(clippy::too_many_lines)]
904    #[tokio::test]
905    async fn activity_trap_should_trigger_an_execution_retry() {
906        set_up();
907        let sim_clock = SimClock::default();
908        let (_guard, db_pool) = Database::Memory.set_up().await;
909        let exec_config = ExecConfig {
910            batch_size: 1,
911            lock_expiry: Duration::from_secs(1),
912            tick_sleep: Duration::ZERO,
913            component_id: ComponentId::dummy_activity(),
914            task_limiter: None,
915        };
916        let expected_reason = "error reason";
917        let expected_detail = "error detail";
918        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
919            WorkerError::ActivityTrap {
920                reason: expected_reason.to_string(),
921                trap_kind: concepts::TrapKind::Trap,
922                detail: expected_detail.to_string(),
923                version: Version::new(2),
924            },
925        )));
926        let retry_exp_backoff = Duration::from_millis(100);
927        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
928        let execution_log = create_and_tick(
929            CreateAndTickConfig {
930                execution_id: ExecutionId::generate(),
931                created_at: sim_clock.now(),
932                max_retries: 1,
933                executed_at: sim_clock.now(),
934                retry_exp_backoff,
935            },
936            sim_clock.clone(),
937            db_pool.clone(),
938            exec_config.clone(),
939            worker,
940            tick_fn,
941        )
942        .await;
943        assert_eq!(3, execution_log.events.len());
944        {
945            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
946                &execution_log.events.get(2).unwrap(),
947                ExecutionEvent {
948                    event: ExecutionEventInner::TemporarilyFailed {
949                        reason_inner,
950                        reason_full,
951                        detail,
952                        backoff_expires_at,
953                    },
954                    created_at: at,
955                }
956                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
957            );
958            assert_eq!(expected_reason, reason_inner.deref());
959            assert_eq!(
960                format!("activity trap: {expected_reason}"),
961                reason_full.deref()
962            );
963            assert_eq!(Some(expected_detail), detail.as_deref());
964            assert_eq!(at, sim_clock.now());
965            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
966        }
967        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
968            std::sync::Mutex::new(IndexMap::from([(
969                Version::new(4),
970                (
971                    vec![],
972                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4)),
973                ),
974            )])),
975        )));
976        // noop until `retry_exp_backoff` expires
977        assert!(tick_fn(
978            exec_config.clone(),
979            sim_clock.clone(),
980            db_pool.clone(),
981            worker.clone(),
982            sim_clock.now(),
983        )
984        .await
985        .executions
986        .is_empty());
987        // tick again to finish the execution
988        sim_clock.move_time_forward(retry_exp_backoff).await;
989        tick_fn(
990            exec_config,
991            sim_clock.clone(),
992            db_pool.clone(),
993            worker,
994            sim_clock.now(),
995        )
996        .await;
997        let execution_log = {
998            let db_connection = db_pool.connection();
999            db_connection
1000                .get(&execution_log.execution_id)
1001                .await
1002                .unwrap()
1003        };
1004        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1005        assert_matches!(
1006            execution_log.events.get(3).unwrap(),
1007            ExecutionEvent {
1008                event: ExecutionEventInner::Locked { .. },
1009                created_at: at
1010            } if *at == sim_clock.now()
1011        );
1012        assert_matches!(
1013            execution_log.events.get(4).unwrap(),
1014            ExecutionEvent {
1015                event: ExecutionEventInner::Finished {
1016                    result: Ok(SupportedFunctionReturnValue::None),
1017                },
1018                created_at: finished_at,
1019            } if *finished_at == sim_clock.now()
1020        );
1021        db_pool.close().await.unwrap();
1022    }
1023
1024    #[tokio::test]
1025    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1026        set_up();
1027        let created_at = Now.now();
1028        let clock_fn = ConstClock(created_at);
1029        let (_guard, db_pool) = Database::Memory.set_up().await;
1030        let exec_config = ExecConfig {
1031            batch_size: 1,
1032            lock_expiry: Duration::from_secs(1),
1033            tick_sleep: Duration::ZERO,
1034            component_id: ComponentId::dummy_activity(),
1035            task_limiter: None,
1036        };
1037
1038        let expected_reason = "error reason";
1039        let expected_detail = "error detail";
1040        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1041            WorkerError::ActivityTrap {
1042                reason: expected_reason.to_string(),
1043                trap_kind: concepts::TrapKind::Trap,
1044                detail: expected_detail.to_string(),
1045                version: Version::new(2),
1046            },
1047        )));
1048        let execution_log = create_and_tick(
1049            CreateAndTickConfig {
1050                execution_id: ExecutionId::generate(),
1051                created_at,
1052                max_retries: 0,
1053                executed_at: created_at,
1054                retry_exp_backoff: Duration::ZERO,
1055            },
1056            clock_fn,
1057            db_pool.clone(),
1058            exec_config.clone(),
1059            worker,
1060            tick_fn,
1061        )
1062        .await;
1063        assert_eq!(3, execution_log.events.len());
1064        let (reason, kind, detail) = assert_matches!(
1065            &execution_log.events.get(2).unwrap(),
1066            ExecutionEvent {
1067                event: ExecutionEventInner::Finished{
1068                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_})
1069                },
1070                created_at: at,
1071            } if *at == created_at
1072            => (reason_inner, kind, detail)
1073        );
1074        assert_eq!(expected_reason, *reason);
1075        assert_eq!(Some(expected_detail), detail.as_deref());
1076        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1077
1078        db_pool.close().await.unwrap();
1079    }
1080
1081    #[tokio::test]
1082    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1083        let worker_error = WorkerError::ActivityTrap {
1084            reason: "error reason".to_string(),
1085            trap_kind: TrapKind::Trap,
1086            detail: "detail".to_string(),
1087            version: Version::new(2),
1088        };
1089        let expected_child_err = FinishedExecutionError::PermanentFailure {
1090            reason_full: "activity trap: error reason".to_string(),
1091            reason_inner: "error reason".to_string(),
1092            kind: PermanentFailureKind::ActivityTrap,
1093            detail: Some("detail".to_string()),
1094        };
1095        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1096            .await;
1097    }
1098
1099    #[tokio::test]
1100    async fn child_execution_permanently_failed_should_notify_parent_timeout() {
1101        let worker_error = WorkerError::TemporaryTimeout;
1102        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1103        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1104            .await;
1105    }
1106
1107    #[tokio::test]
1108    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1109        let parent_id = ExecutionId::from_parts(1, 1);
1110        let join_set_id_outer =
1111            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1112        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1113        let join_set_id_inner =
1114            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1115        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1116        let worker_error = WorkerError::FatalError(
1117            FatalError::UnhandledChildExecutionError {
1118                child_execution_id: child_execution_id.clone(),
1119                root_cause_id: root_cause_id.clone(),
1120            },
1121            Version::new(2),
1122        );
1123        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1124            child_execution_id,
1125            root_cause_id,
1126        };
1127        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1128            .await;
1129    }
1130
1131    #[expect(clippy::too_many_lines)]
1132    async fn child_execution_permanently_failed_should_notify_parent(
1133        worker_error: WorkerError,
1134        expected_child_err: FinishedExecutionError,
1135    ) {
1136        use concepts::storage::JoinSetResponseEventOuter;
1137        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1138
1139        set_up();
1140        let sim_clock = SimClock::default();
1141        let (_guard, db_pool) = Database::Memory.set_up().await;
1142
1143        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1144            WorkerResult::DbUpdatedByWorker,
1145        ));
1146        let parent_execution_id = ExecutionId::generate();
1147        db_pool
1148            .connection()
1149            .create(CreateRequest {
1150                created_at: sim_clock.now(),
1151                execution_id: parent_execution_id.clone(),
1152                ffqn: FFQN_SOME,
1153                params: Params::empty(),
1154                parent: None,
1155                metadata: concepts::ExecutionMetadata::empty(),
1156                scheduled_at: sim_clock.now(),
1157                retry_exp_backoff: Duration::ZERO,
1158                max_retries: 0,
1159                component_id: ComponentId::dummy_activity(),
1160                scheduled_by: None,
1161            })
1162            .await
1163            .unwrap();
1164        tick_fn(
1165            ExecConfig {
1166                batch_size: 1,
1167                lock_expiry: LOCK_EXPIRY,
1168                tick_sleep: Duration::ZERO,
1169                component_id: ComponentId::dummy_activity(),
1170                task_limiter: None,
1171            },
1172            sim_clock.clone(),
1173            db_pool.clone(),
1174            parent_worker,
1175            sim_clock.now(),
1176        )
1177        .await;
1178
1179        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1180        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1181        // executor does not append anything, this should have been written by the worker:
1182        {
1183            let child = CreateRequest {
1184                created_at: sim_clock.now(),
1185                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1186                ffqn: FFQN_CHILD,
1187                params: Params::empty(),
1188                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1189                metadata: concepts::ExecutionMetadata::empty(),
1190                scheduled_at: sim_clock.now(),
1191                retry_exp_backoff: Duration::ZERO,
1192                max_retries: 0,
1193                component_id: ComponentId::dummy_activity(),
1194                scheduled_by: None,
1195            };
1196            let current_time = sim_clock.now();
1197            let join_set = AppendRequest {
1198                created_at: current_time,
1199                event: ExecutionEventInner::HistoryEvent {
1200                    event: HistoryEvent::JoinSetCreate {
1201                        join_set_id: join_set_id.clone(),
1202                        closing_strategy: ClosingStrategy::Complete,
1203                    },
1204                },
1205            };
1206            let child_exec_req = AppendRequest {
1207                created_at: current_time,
1208                event: ExecutionEventInner::HistoryEvent {
1209                    event: HistoryEvent::JoinSetRequest {
1210                        join_set_id: join_set_id.clone(),
1211                        request: JoinSetRequest::ChildExecutionRequest {
1212                            child_execution_id: child_execution_id.clone(),
1213                        },
1214                    },
1215                },
1216            };
1217            let join_next = AppendRequest {
1218                created_at: current_time,
1219                event: ExecutionEventInner::HistoryEvent {
1220                    event: HistoryEvent::JoinNext {
1221                        join_set_id: join_set_id.clone(),
1222                        run_expires_at: sim_clock.now(),
1223                        closing: false,
1224                    },
1225                },
1226            };
1227            db_pool
1228                .connection()
1229                .append_batch_create_new_execution(
1230                    current_time,
1231                    vec![join_set, child_exec_req, join_next],
1232                    parent_execution_id.clone(),
1233                    Version::new(2),
1234                    vec![child],
1235                )
1236                .await
1237                .unwrap();
1238        }
1239
1240        let child_worker = Arc::new(
1241            SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1242        );
1243
1244        // execute the child
1245        tick_fn(
1246            ExecConfig {
1247                batch_size: 1,
1248                lock_expiry: LOCK_EXPIRY,
1249                tick_sleep: Duration::ZERO,
1250                component_id: ComponentId::dummy_activity(),
1251                task_limiter: None,
1252            },
1253            sim_clock.clone(),
1254            db_pool.clone(),
1255            child_worker,
1256            sim_clock.now(),
1257        )
1258        .await;
1259        if expected_child_err == FinishedExecutionError::PermanentTimeout {
1260            // In case of timeout, let the timers watcher handle it
1261            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1262            expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1263                .await
1264                .unwrap();
1265        }
1266        let child_log = db_pool
1267            .connection()
1268            .get(&ExecutionId::Derived(child_execution_id.clone()))
1269            .await
1270            .unwrap();
1271        assert!(child_log.pending_state.is_finished());
1272        assert_eq!(
1273            Version(2),
1274            child_log.next_version,
1275            "created = 0, locked = 1, with_single_result = 2"
1276        );
1277        assert_eq!(
1278            ExecutionEventInner::Finished {
1279                result: Err(expected_child_err)
1280            },
1281            child_log.last_event().event
1282        );
1283        let parent_log = db_pool
1284            .connection()
1285            .get(&parent_execution_id)
1286            .await
1287            .unwrap();
1288        assert_matches!(
1289            parent_log.pending_state,
1290            PendingState::PendingAt {
1291                scheduled_at
1292            } if scheduled_at == sim_clock.now(),
1293            "parent should be back to pending"
1294        );
1295        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1296            parent_log.responses.last(),
1297            Some(JoinSetResponseEventOuter{
1298                created_at: at,
1299                event: JoinSetResponseEvent{
1300                    join_set_id: found_join_set_id,
1301                    event: JoinSetResponse::ChildExecutionFinished {
1302                        child_execution_id: found_child_execution_id,
1303                        finished_version,
1304                        result: found_result,
1305                    }
1306                }
1307            })
1308             if *at == sim_clock.now()
1309            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1310        );
1311        assert_eq!(join_set_id, *found_join_set_id);
1312        assert_eq!(child_execution_id, *found_child_execution_id);
1313        assert_eq!(child_log.next_version, *child_finished_version);
1314        assert!(found_result.is_err());
1315
1316        db_pool.close().await.unwrap();
1317    }
1318
1319    #[derive(Clone, Debug)]
1320    struct SleepyWorker {
1321        duration: Duration,
1322        result: SupportedFunctionReturnValue,
1323        exported: [FunctionMetadata; 1],
1324    }
1325
1326    #[async_trait]
1327    impl Worker for SleepyWorker {
1328        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1329            tokio::time::sleep(self.duration).await;
1330            WorkerResult::Ok(self.result.clone(), ctx.version)
1331        }
1332
1333        fn exported_functions(&self) -> &[FunctionMetadata] {
1334            &self.exported
1335        }
1336
1337        fn imported_functions(&self) -> &[FunctionMetadata] {
1338            &[]
1339        }
1340    }
1341
1342    #[expect(clippy::too_many_lines)]
1343    #[tokio::test]
1344    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1345        set_up();
1346        let sim_clock = SimClock::default();
1347        let (_guard, db_pool) = Database::Memory.set_up().await;
1348        let lock_expiry = Duration::from_millis(100);
1349        let exec_config = ExecConfig {
1350            batch_size: 1,
1351            lock_expiry,
1352            tick_sleep: Duration::ZERO,
1353            component_id: ComponentId::dummy_activity(),
1354            task_limiter: None,
1355        };
1356
1357        let worker = Arc::new(SleepyWorker {
1358            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1359            result: SupportedFunctionReturnValue::None,
1360            exported: [FunctionMetadata {
1361                ffqn: FFQN_SOME,
1362                parameter_types: ParameterTypes::default(),
1363                return_type: None,
1364                extension: None,
1365                submittable: true,
1366            }],
1367        });
1368        // Create an execution
1369        let execution_id = ExecutionId::generate();
1370        let timeout_duration = Duration::from_millis(300);
1371        let db_connection = db_pool.connection();
1372        db_connection
1373            .create(CreateRequest {
1374                created_at: sim_clock.now(),
1375                execution_id: execution_id.clone(),
1376                ffqn: FFQN_SOME,
1377                params: Params::empty(),
1378                parent: None,
1379                metadata: concepts::ExecutionMetadata::empty(),
1380                scheduled_at: sim_clock.now(),
1381                retry_exp_backoff: timeout_duration,
1382                max_retries: 1,
1383                component_id: ComponentId::dummy_activity(),
1384                scheduled_by: None,
1385            })
1386            .await
1387            .unwrap();
1388
1389        let ffqns = super::extract_ffqns(worker.as_ref());
1390        let executor = ExecTask::new(
1391            worker,
1392            exec_config,
1393            sim_clock.clone(),
1394            db_pool.clone(),
1395            ffqns,
1396        );
1397        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1398        assert_eq!(1, first_execution_progress.executions.len());
1399        // Started hanging, wait for lock expiry.
1400        sim_clock.move_time_forward(lock_expiry).await;
1401        // cleanup should be called
1402        let now_after_first_lock_expiry = sim_clock.now();
1403        {
1404            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1405            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1406            assert!(cleanup_progress.executions.is_empty());
1407        }
1408        {
1409            let expired_locks =
1410                expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1411                    .await
1412                    .unwrap()
1413                    .expired_locks;
1414            assert_eq!(1, expired_locks);
1415        }
1416        assert!(!first_execution_progress
1417            .executions
1418            .pop()
1419            .unwrap()
1420            .1
1421            .is_finished());
1422
1423        let execution_log = db_connection.get(&execution_id).await.unwrap();
1424        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1425        assert_matches!(
1426            &execution_log.events.get(2).unwrap(),
1427            ExecutionEvent {
1428                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
1429                created_at: at,
1430            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1431        );
1432        assert_eq!(
1433            PendingState::PendingAt {
1434                scheduled_at: expected_first_timeout_expiry
1435            },
1436            execution_log.pending_state
1437        );
1438        sim_clock.move_time_forward(timeout_duration).await;
1439        let now_after_first_timeout = sim_clock.now();
1440        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1441
1442        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1443        assert_eq!(1, second_execution_progress.executions.len());
1444
1445        // Started hanging, wait for lock expiry.
1446        sim_clock.move_time_forward(lock_expiry).await;
1447        // cleanup should be called
1448        let now_after_second_lock_expiry = sim_clock.now();
1449        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1450        {
1451            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1452            assert!(cleanup_progress.executions.is_empty());
1453        }
1454        {
1455            let expired_locks =
1456                expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1457                    .await
1458                    .unwrap()
1459                    .expired_locks;
1460            assert_eq!(1, expired_locks);
1461        }
1462        assert!(!second_execution_progress
1463            .executions
1464            .pop()
1465            .unwrap()
1466            .1
1467            .is_finished());
1468
1469        drop(db_connection);
1470        drop(executor);
1471        db_pool.close().await.unwrap();
1472    }
1473}