obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7    LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FunctionMetadata, StrVariant, SupportedFunctionReturnValue};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13    FinishedExecutionError,
14    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30    pub lock_expiry: Duration,
31    pub tick_sleep: Duration,
32    pub batch_size: u32,
33    pub component_id: ComponentId,
34    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35    pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39    worker: Arc<dyn Worker>,
40    config: ExecConfig,
41    clock_fn: C, // Used for obtaining current time when the execution finishes.
42    db_pool: Arc<dyn DbPool>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!("Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: Arc<dyn DbPool>,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            clock_fn,
123            db_pool,
124            ffqns,
125        }
126    }
127
128    pub fn spawn_new(
129        worker: Arc<dyn Worker>,
130        config: ExecConfig,
131        clock_fn: C,
132        db_pool: Arc<dyn DbPool>,
133    ) -> ExecutorTaskHandle {
134        let is_closing = Arc::new(AtomicBool::default());
135        let is_closing_inner = is_closing.clone();
136        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137        let component_id = config.component_id.clone();
138        let executor_id = config.executor_id;
139        let abort_handle = tokio::spawn(async move {
140            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141            let task = ExecTask {
142                worker,
143                config,
144                db_pool,
145                ffqns: ffqns.clone(),
146                clock_fn: clock_fn.clone(),
147            };
148            loop {
149                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150                let executed_at = clock_fn.now();
151                task.db_pool
152                    .connection()
153                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154                    .await;
155                if is_closing_inner.load(Ordering::Relaxed) {
156                    return;
157                }
158            }
159        })
160        .abort_handle();
161        ExecutorTaskHandle {
162            is_closing,
163            abort_handle,
164            component_id,
165            executor_id,
166        }
167    }
168
169    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170        if let Some(task_limiter) = &self.config.task_limiter {
171            let mut locks = Vec::new();
172            for _ in 0..self.config.batch_size {
173                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174                    locks.push(Some(permit));
175                } else {
176                    break;
177                }
178            }
179            locks
180        } else {
181            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182            for _ in 0..self.config.batch_size {
183                vec.push(None);
184            }
185            vec
186        }
187    }
188
189    #[cfg(feature = "test")]
190    pub async fn tick_test(
191        &self,
192        executed_at: DateTime<Utc>,
193        run_id: RunId,
194    ) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at, run_id).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199    async fn tick(
200        &self,
201        executed_at: DateTime<Utc>,
202        run_id: RunId,
203    ) -> Result<ExecutionProgress, ()> {
204        let locked_executions = {
205            let mut permits = self.acquire_task_permits();
206            if permits.is_empty() {
207                return Ok(ExecutionProgress::default());
208            }
209            let db_connection = self.db_pool.connection();
210            let lock_expires_at = executed_at + self.config.lock_expiry;
211            let locked_executions = db_connection
212                .lock_pending(
213                    permits.len(), // batch size
214                    executed_at,   // fetch expiring before now
215                    self.ffqns.clone(),
216                    executed_at, // created at
217                    self.config.component_id.clone(),
218                    self.config.executor_id,
219                    lock_expires_at,
220                    run_id,
221                )
222                .await
223                .map_err(|err| {
224                    warn!("lock_pending error {err:?}");
225                })?;
226            // Drop permits if too many were allocated.
227            while permits.len() > locked_executions.len() {
228                permits.pop();
229            }
230            assert_eq!(permits.len(), locked_executions.len());
231            locked_executions.into_iter().zip(permits)
232        };
233        let execution_deadline = executed_at + self.config.lock_expiry;
234
235        let mut executions = Vec::with_capacity(locked_executions.len());
236        for (locked_execution, permit) in locked_executions {
237            let execution_id = locked_execution.execution_id.clone();
238            let join_handle = {
239                let worker = self.worker.clone();
240                let db_pool = self.db_pool.clone();
241                let clock_fn = self.clock_fn.clone();
242                let run_id = locked_execution.run_id;
243                let worker_span = info_span!(parent: None, "worker",
244                    "otel.name" = format!("worker {}", locked_execution.ffqn),
245                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246                locked_execution.metadata.enrich(&worker_span);
247                tokio::spawn({
248                    let worker_span2 = worker_span.clone();
249                    async move {
250                        let _permit = permit;
251                        let res = Self::run_worker(
252                            worker,
253                            db_pool.as_ref(),
254                            execution_deadline,
255                            clock_fn,
256                            locked_execution,
257                            worker_span2,
258                        )
259                        .await;
260                        if let Err(db_error) = res {
261                            error!("Got DbError `{db_error:?}`, expecting watcher to mark execution as timed out");
262                        }
263                    }
264                    .instrument(worker_span)
265                })
266            };
267            executions.push((execution_id, join_handle));
268        }
269        Ok(ExecutionProgress { executions })
270    }
271
272    async fn run_worker(
273        worker: Arc<dyn Worker>,
274        db_pool: &dyn DbPool,
275        execution_deadline: DateTime<Utc>,
276        clock_fn: C,
277        locked_execution: LockedExecution,
278        worker_span: Span,
279    ) -> Result<(), DbError> {
280        debug!("Worker::run starting");
281        trace!(
282            version = %locked_execution.version,
283            params = ?locked_execution.params,
284            event_history = ?locked_execution.event_history,
285            "Worker::run starting"
286        );
287        let can_be_retried = ExecutionLog::can_be_retried_after(
288            locked_execution.temporary_event_count + 1,
289            locked_execution.max_retries,
290            locked_execution.retry_exp_backoff,
291        );
292        let unlock_expiry_on_limit_reached =
293            ExecutionLog::compute_retry_duration_when_retrying_forever(
294                locked_execution.temporary_event_count + 1,
295                locked_execution.retry_exp_backoff,
296            );
297        let ctx = WorkerContext {
298            execution_id: locked_execution.execution_id.clone(),
299            metadata: locked_execution.metadata,
300            ffqn: locked_execution.ffqn,
301            params: locked_execution.params,
302            event_history: locked_execution.event_history,
303            responses: locked_execution
304                .responses
305                .into_iter()
306                .map(|outer| outer.event)
307                .collect(),
308            version: locked_execution.version,
309            execution_deadline,
310            can_be_retried: can_be_retried.is_some(),
311            run_id: locked_execution.run_id,
312            worker_span,
313        };
314        let worker_result = worker.run(ctx).await;
315        trace!(?worker_result, "Worker::run finished");
316        let result_obtained_at = clock_fn.now();
317        match Self::worker_result_to_execution_event(
318            locked_execution.execution_id,
319            worker_result,
320            result_obtained_at,
321            locked_execution.parent,
322            can_be_retried,
323            unlock_expiry_on_limit_reached,
324        )? {
325            Some(append) => {
326                let db_connection = db_pool.connection();
327                trace!("Appending {append:?}");
328                append.append(db_connection.as_ref()).await
329            }
330            None => Ok(()),
331        }
332    }
333
334    /// Map the `WorkerError` to an temporary or a permanent failure.
335    fn worker_result_to_execution_event(
336        execution_id: ExecutionId,
337        worker_result: WorkerResult,
338        result_obtained_at: DateTime<Utc>,
339        parent: Option<(ExecutionId, JoinSetId)>,
340        can_be_retried: Option<Duration>,
341        unlock_expiry_on_limit_reached: Duration,
342    ) -> Result<Option<Append>, DbError> {
343        Ok(match worker_result {
344            WorkerResult::Ok(result, new_version, http_client_traces) => {
345                info!(
346                    "Execution finished: {}",
347                    result.as_pending_state_finished_result()
348                );
349                let child_finished =
350                    parent.map(
351                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
352                            parent_execution_id,
353                            parent_join_set,
354                            result: result.clone(),
355                        },
356                    );
357                let primary_event = AppendRequest {
358                    created_at: result_obtained_at,
359                    event: ExecutionEventInner::Finished {
360                        result,
361                        http_client_traces,
362                    },
363                };
364
365                Some(Append {
366                    created_at: result_obtained_at,
367                    primary_event,
368                    execution_id,
369                    version: new_version,
370                    child_finished,
371                })
372            }
373            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
374            WorkerResult::Err(err) => {
375                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
376                let (primary_event, child_finished, version) = match err {
377                    WorkerError::TemporaryTimeout {
378                        http_client_traces,
379                        version,
380                    } => {
381                        if let Some(duration) = can_be_retried {
382                            let backoff_expires_at = result_obtained_at + duration;
383                            info!(
384                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
385                            );
386                            (
387                                ExecutionEventInner::TemporarilyTimedOut {
388                                    backoff_expires_at,
389                                    http_client_traces,
390                                },
391                                None,
392                                version,
393                            )
394                        } else {
395                            info!("Permanent timeout");
396                            let result = SupportedFunctionReturnValue::ExecutionError(
397                                FinishedExecutionError::PermanentTimeout,
398                            );
399                            let child_finished =
400                                parent.map(|(parent_execution_id, parent_join_set)| {
401                                    ChildFinishedResponse {
402                                        parent_execution_id,
403                                        parent_join_set,
404                                        result: result.clone(),
405                                    }
406                                });
407                            (
408                                ExecutionEventInner::Finished {
409                                    result,
410                                    http_client_traces,
411                                },
412                                child_finished,
413                                version,
414                            )
415                        }
416                    }
417                    WorkerError::DbError(db_error) => {
418                        return Err(db_error);
419                    }
420                    WorkerError::ActivityTrap {
421                        reason: reason_inner,
422                        trap_kind,
423                        detail,
424                        version,
425                        http_client_traces,
426                    } => retry_or_fail(
427                        result_obtained_at,
428                        parent,
429                        can_be_retried,
430                        reason_full,
431                        reason_inner,
432                        trap_kind, // short reason like `trap` for logs
433                        detail,
434                        version,
435                        http_client_traces,
436                    ),
437                    WorkerError::ActivityPreopenedDirError {
438                        reason_kind,
439                        reason_inner,
440                        version,
441                    } => retry_or_fail(
442                        result_obtained_at,
443                        parent,
444                        can_be_retried,
445                        reason_full,
446                        reason_inner,
447                        reason_kind, // short reason for logs
448                        None,        // detail
449                        version,
450                        None, // http_client_traces
451                    ),
452                    WorkerError::ActivityReturnedError {
453                        detail,
454                        version,
455                        http_client_traces,
456                    } => {
457                        let duration = can_be_retried.expect(
458                            "ActivityReturnedError must not be returned when retries are exhausted",
459                        );
460                        let expires_at = result_obtained_at + duration;
461                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
462                        (
463                            ExecutionEventInner::TemporarilyFailed {
464                                backoff_expires_at: expires_at,
465                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
466                                reason_inner: StrVariant::Static("activity returned error"),
467                                detail, // contains the backtrace
468                                http_client_traces,
469                            },
470                            None,
471                            version,
472                        )
473                    }
474                    WorkerError::LimitReached {
475                        reason: inner_reason,
476                        version: new_version,
477                    } => {
478                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
479                        warn!(
480                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
481                        );
482                        (
483                            ExecutionEventInner::Unlocked {
484                                backoff_expires_at: expires_at,
485                                reason: StrVariant::from(reason_full),
486                            },
487                            None,
488                            new_version,
489                        )
490                    }
491                    WorkerError::FatalError(fatal_error, version) => {
492                        info!("Fatal worker error - {fatal_error:?}");
493                        let result = SupportedFunctionReturnValue::ExecutionError(
494                            FinishedExecutionError::from(fatal_error),
495                        );
496                        let child_finished =
497                            parent.map(|(parent_execution_id, parent_join_set)| {
498                                ChildFinishedResponse {
499                                    parent_execution_id,
500                                    parent_join_set,
501                                    result: result.clone(),
502                                }
503                            });
504                        (
505                            ExecutionEventInner::Finished {
506                                result,
507                                http_client_traces: None,
508                            },
509                            child_finished,
510                            version,
511                        )
512                    }
513                };
514                Some(Append {
515                    created_at: result_obtained_at,
516                    primary_event: AppendRequest {
517                        created_at: result_obtained_at,
518                        event: primary_event,
519                    },
520                    execution_id,
521                    version,
522                    child_finished,
523                })
524            }
525        })
526    }
527}
528
529#[expect(clippy::too_many_arguments)]
530fn retry_or_fail(
531    result_obtained_at: DateTime<Utc>,
532    parent: Option<(ExecutionId, JoinSetId)>,
533    can_be_retried: Option<Duration>,
534    reason_full: String,
535    reason_inner: String,
536    err_kind_for_logs: impl Display,
537    detail: Option<String>,
538    version: Version,
539    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
540) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
541    if let Some(duration) = can_be_retried {
542        let expires_at = result_obtained_at + duration;
543        debug!(
544            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
545        );
546        (
547            ExecutionEventInner::TemporarilyFailed {
548                backoff_expires_at: expires_at,
549                reason_full: StrVariant::from(reason_full),
550                reason_inner: StrVariant::from(reason_inner),
551                detail,
552                http_client_traces,
553            },
554            None,
555            version,
556        )
557    } else {
558        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
559        let result = SupportedFunctionReturnValue::ExecutionError(
560            FinishedExecutionError::PermanentFailure {
561                reason_inner,
562                reason_full,
563                kind: PermanentFailureKind::ActivityTrap,
564                detail,
565            },
566        );
567
568        let child_finished =
569            parent.map(
570                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
571                    parent_execution_id,
572                    parent_join_set,
573                    result: result.clone(),
574                },
575            );
576        (
577            ExecutionEventInner::Finished {
578                result,
579                http_client_traces,
580            },
581            child_finished,
582            version,
583        )
584    }
585}
586
587#[derive(Debug, Clone)]
588pub(crate) struct ChildFinishedResponse {
589    pub(crate) parent_execution_id: ExecutionId,
590    pub(crate) parent_join_set: JoinSetId,
591    pub(crate) result: SupportedFunctionReturnValue,
592}
593
594#[derive(Debug, Clone)]
595pub(crate) struct Append {
596    pub(crate) created_at: DateTime<Utc>,
597    pub(crate) primary_event: AppendRequest,
598    pub(crate) execution_id: ExecutionId,
599    pub(crate) version: Version,
600    pub(crate) child_finished: Option<ChildFinishedResponse>,
601}
602
603impl Append {
604    pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
605        if let Some(child_finished) = self.child_finished {
606            assert_matches!(
607                &self.primary_event,
608                AppendRequest {
609                    event: ExecutionEventInner::Finished { .. },
610                    ..
611                }
612            );
613            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
614            db_connection
615                .append_batch_respond_to_parent(
616                    derived.clone(),
617                    self.created_at,
618                    vec![self.primary_event],
619                    self.version.clone(),
620                    child_finished.parent_execution_id,
621                    JoinSetResponseEventOuter {
622                        created_at: self.created_at,
623                        event: JoinSetResponseEvent {
624                            join_set_id: child_finished.parent_join_set,
625                            event: JoinSetResponse::ChildExecutionFinished {
626                                child_execution_id: derived,
627                                // Since self.primary_event is a finished event, the version will remain the same.
628                                finished_version: self.version,
629                                result: child_finished.result,
630                            },
631                        },
632                    },
633                )
634                .await?;
635        } else {
636            db_connection
637                .append_batch(
638                    self.created_at,
639                    vec![self.primary_event],
640                    self.execution_id,
641                    self.version,
642                )
643                .await?;
644        }
645        Ok(())
646    }
647}
648
649#[cfg(any(test, feature = "test"))]
650pub mod simple_worker {
651    use crate::worker::{Worker, WorkerContext, WorkerResult};
652    use async_trait::async_trait;
653    use concepts::{
654        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
655        storage::{HistoryEvent, Version},
656    };
657    use indexmap::IndexMap;
658    use std::sync::Arc;
659    use tracing::trace;
660
661    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
662    pub type SimpleWorkerResultMap =
663        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
664
665    #[derive(Clone, Debug)]
666    pub struct SimpleWorker {
667        pub worker_results_rev: SimpleWorkerResultMap,
668        pub ffqn: FunctionFqn,
669        exported: [FunctionMetadata; 1],
670    }
671
672    impl SimpleWorker {
673        #[must_use]
674        pub fn with_single_result(res: WorkerResult) -> Self {
675            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
676                Version::new(2),
677                (vec![], res),
678            )]))))
679        }
680
681        #[must_use]
682        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
683            Self {
684                worker_results_rev: self.worker_results_rev,
685                exported: [FunctionMetadata {
686                    ffqn: ffqn.clone(),
687                    parameter_types: ParameterTypes::default(),
688                    return_type: RETURN_TYPE_DUMMY,
689                    extension: None,
690                    submittable: true,
691                }],
692                ffqn,
693            }
694        }
695
696        #[must_use]
697        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
698            Self {
699                worker_results_rev,
700                ffqn: FFQN_SOME,
701                exported: [FunctionMetadata {
702                    ffqn: FFQN_SOME,
703                    parameter_types: ParameterTypes::default(),
704                    return_type: RETURN_TYPE_DUMMY,
705                    extension: None,
706                    submittable: true,
707                }],
708            }
709        }
710    }
711
712    #[async_trait]
713    impl Worker for SimpleWorker {
714        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
715            let (expected_version, (expected_eh, worker_result)) =
716                self.worker_results_rev.lock().unwrap().pop().unwrap();
717            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
718            assert_eq!(expected_version, ctx.version);
719            assert_eq!(expected_eh, ctx.event_history);
720            worker_result
721        }
722
723        fn exported_functions(&self) -> &[FunctionMetadata] {
724            &self.exported
725        }
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use self::simple_worker::SimpleWorker;
732    use super::*;
733    use crate::{expired_timers_watcher, worker::WorkerResult};
734    use assert_matches::assert_matches;
735    use async_trait::async_trait;
736    use concepts::storage::{CreateRequest, JoinSetRequest};
737    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
738    use concepts::time::Now;
739    use concepts::{
740        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
741        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
742    };
743    use db_tests::Database;
744    use indexmap::IndexMap;
745    use simple_worker::FFQN_SOME;
746    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
747    use test_utils::set_up;
748    use test_utils::sim_clock::{ConstClock, SimClock};
749
750    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
751
752    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
753        config: ExecConfig,
754        clock_fn: C,
755        db_pool: Arc<dyn DbPool>,
756        worker: Arc<W>,
757        executed_at: DateTime<Utc>,
758    ) -> ExecutionProgress {
759        trace!("Ticking with {worker:?}");
760        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
761        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
762        let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
763        loop {
764            execution_progress
765                .executions
766                .retain(|(_, abort_handle)| !abort_handle.is_finished());
767            if execution_progress.executions.is_empty() {
768                return execution_progress;
769            }
770            tokio::time::sleep(Duration::from_millis(10)).await;
771        }
772    }
773
774    #[tokio::test]
775    async fn execute_simple_lifecycle_tick_based_mem() {
776        let created_at = Now.now();
777        let (_guard, db_pool) = Database::Memory.set_up().await;
778        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
779        db_pool.close().await.unwrap();
780    }
781
782    #[tokio::test]
783    async fn execute_simple_lifecycle_tick_based_sqlite() {
784        let created_at = Now.now();
785        let (_guard, db_pool) = Database::Sqlite.set_up().await;
786        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
787        db_pool.close().await.unwrap();
788    }
789
790    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
791        pool: Arc<dyn DbPool>,
792        clock_fn: C,
793    ) {
794        set_up();
795        let created_at = clock_fn.now();
796        let exec_config = ExecConfig {
797            batch_size: 1,
798            lock_expiry: Duration::from_secs(1),
799            tick_sleep: Duration::from_millis(100),
800            component_id: ComponentId::dummy_activity(),
801            task_limiter: None,
802            executor_id: ExecutorId::generate(),
803        };
804
805        let execution_log = create_and_tick(
806            CreateAndTickConfig {
807                execution_id: ExecutionId::generate(),
808                created_at,
809                max_retries: 0,
810                executed_at: created_at,
811                retry_exp_backoff: Duration::ZERO,
812            },
813            clock_fn,
814            pool,
815            exec_config,
816            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
817                SUPPORTED_RETURN_VALUE_OK_EMPTY,
818                Version::new(2),
819                None,
820            ))),
821            tick_fn,
822        )
823        .await;
824        assert_matches!(
825            execution_log.events.get(2).unwrap(),
826            ExecutionEvent {
827                event: ExecutionEventInner::Finished {
828                    result: SupportedFunctionReturnValue::Ok { ok: None },
829                    http_client_traces: None
830                },
831                created_at: _,
832                backtrace_id: None,
833            }
834        );
835    }
836
837    #[tokio::test]
838    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
839        set_up();
840        let created_at = Now.now();
841        let clock_fn = ConstClock(created_at);
842        let (_guard, db_pool) = Database::Memory.set_up().await;
843        let exec_config = ExecConfig {
844            batch_size: 1,
845            lock_expiry: Duration::from_secs(1),
846            tick_sleep: Duration::ZERO,
847            component_id: ComponentId::dummy_activity(),
848            task_limiter: None,
849            executor_id: ExecutorId::generate(),
850        };
851
852        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
853            SUPPORTED_RETURN_VALUE_OK_EMPTY,
854            Version::new(2),
855            None,
856        )));
857        let exec_task = ExecTask::spawn_new(
858            worker.clone(),
859            exec_config.clone(),
860            clock_fn,
861            db_pool.clone(),
862        );
863
864        let execution_log = create_and_tick(
865            CreateAndTickConfig {
866                execution_id: ExecutionId::generate(),
867                created_at,
868                max_retries: 0,
869                executed_at: created_at,
870                retry_exp_backoff: Duration::ZERO,
871            },
872            clock_fn,
873            db_pool.clone(),
874            exec_config,
875            worker,
876            |_, _, _, _, _| async {
877                tokio::time::sleep(Duration::from_secs(1)).await; // FIXME: non determinism, possible race
878                ExecutionProgress::default()
879            },
880        )
881        .await;
882        exec_task.close().await;
883        db_pool.close().await.unwrap();
884        assert_matches!(
885            execution_log.events.get(2).unwrap(),
886            ExecutionEvent {
887                event: ExecutionEventInner::Finished {
888                    result: SupportedFunctionReturnValue::Ok { ok: None },
889                    http_client_traces: None
890                },
891                created_at: _,
892                backtrace_id: None,
893            }
894        );
895    }
896
897    struct CreateAndTickConfig {
898        execution_id: ExecutionId,
899        created_at: DateTime<Utc>,
900        max_retries: u32,
901        executed_at: DateTime<Utc>,
902        retry_exp_backoff: Duration,
903    }
904
905    async fn create_and_tick<
906        W: Worker,
907        C: ClockFn,
908        T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
909        F: Future<Output = ExecutionProgress>,
910    >(
911        config: CreateAndTickConfig,
912        clock_fn: C,
913        db_pool: Arc<dyn DbPool>,
914        exec_config: ExecConfig,
915        worker: Arc<W>,
916        mut tick: T,
917    ) -> ExecutionLog {
918        // Create an execution
919        let db_connection = db_pool.connection();
920        db_connection
921            .create(CreateRequest {
922                created_at: config.created_at,
923                execution_id: config.execution_id.clone(),
924                ffqn: FFQN_SOME,
925                params: Params::empty(),
926                parent: None,
927                metadata: concepts::ExecutionMetadata::empty(),
928                scheduled_at: config.created_at,
929                retry_exp_backoff: config.retry_exp_backoff,
930                max_retries: config.max_retries,
931                component_id: ComponentId::dummy_activity(),
932                scheduled_by: None,
933            })
934            .await
935            .unwrap();
936        // execute!
937        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
938        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
939        debug!("Execution history after tick: {execution_log:?}");
940        // check that DB contains Created and Locked events.
941        assert_matches!(
942            execution_log.events.first().unwrap(),
943            ExecutionEvent {
944                event: ExecutionEventInner::Created { .. },
945                created_at: actually_created_at,
946                backtrace_id: None,
947            }
948            if config.created_at == *actually_created_at
949        );
950        let locked_at = assert_matches!(
951            execution_log.events.get(1).unwrap(),
952            ExecutionEvent {
953                event: ExecutionEventInner::Locked { .. },
954                created_at: locked_at,
955                backtrace_id: None,
956            } if config.created_at <= *locked_at
957            => *locked_at
958        );
959        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
960            event: _,
961            created_at: executed_at,
962            backtrace_id: None,
963        } if *executed_at >= locked_at);
964        execution_log
965    }
966
967    #[tokio::test]
968    async fn activity_trap_should_trigger_an_execution_retry() {
969        set_up();
970        let sim_clock = SimClock::default();
971        let (_guard, db_pool) = Database::Memory.set_up().await;
972        let exec_config = ExecConfig {
973            batch_size: 1,
974            lock_expiry: Duration::from_secs(1),
975            tick_sleep: Duration::ZERO,
976            component_id: ComponentId::dummy_activity(),
977            task_limiter: None,
978            executor_id: ExecutorId::generate(),
979        };
980        let expected_reason = "error reason";
981        let expected_detail = "error detail";
982        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
983            WorkerError::ActivityTrap {
984                reason: expected_reason.to_string(),
985                trap_kind: concepts::TrapKind::Trap,
986                detail: Some(expected_detail.to_string()),
987                version: Version::new(2),
988                http_client_traces: None,
989            },
990        )));
991        let retry_exp_backoff = Duration::from_millis(100);
992        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
993        let execution_log = create_and_tick(
994            CreateAndTickConfig {
995                execution_id: ExecutionId::generate(),
996                created_at: sim_clock.now(),
997                max_retries: 1,
998                executed_at: sim_clock.now(),
999                retry_exp_backoff,
1000            },
1001            sim_clock.clone(),
1002            db_pool.clone(),
1003            exec_config.clone(),
1004            worker,
1005            tick_fn,
1006        )
1007        .await;
1008        assert_eq!(3, execution_log.events.len());
1009        {
1010            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1011                &execution_log.events.get(2).unwrap(),
1012                ExecutionEvent {
1013                    event: ExecutionEventInner::TemporarilyFailed {
1014                        reason_inner,
1015                        reason_full,
1016                        detail,
1017                        backoff_expires_at,
1018                        http_client_traces: None,
1019                    },
1020                    created_at: at,
1021                    backtrace_id: None,
1022                }
1023                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1024            );
1025            assert_eq!(expected_reason, reason_inner.deref());
1026            assert_eq!(
1027                format!("activity trap: {expected_reason}"),
1028                reason_full.deref()
1029            );
1030            assert_eq!(Some(expected_detail), detail.as_deref());
1031            assert_eq!(at, sim_clock.now());
1032            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1033        }
1034        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1035            std::sync::Mutex::new(IndexMap::from([(
1036                Version::new(4),
1037                (
1038                    vec![],
1039                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1040                ),
1041            )])),
1042        )));
1043        // noop until `retry_exp_backoff` expires
1044        assert!(
1045            tick_fn(
1046                exec_config.clone(),
1047                sim_clock.clone(),
1048                db_pool.clone(),
1049                worker.clone(),
1050                sim_clock.now(),
1051            )
1052            .await
1053            .executions
1054            .is_empty()
1055        );
1056        // tick again to finish the execution
1057        sim_clock.move_time_forward(retry_exp_backoff);
1058        tick_fn(
1059            exec_config,
1060            sim_clock.clone(),
1061            db_pool.clone(),
1062            worker,
1063            sim_clock.now(),
1064        )
1065        .await;
1066        let execution_log = {
1067            let db_connection = db_pool.connection();
1068            db_connection
1069                .get(&execution_log.execution_id)
1070                .await
1071                .unwrap()
1072        };
1073        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1074        assert_matches!(
1075            execution_log.events.get(3).unwrap(),
1076            ExecutionEvent {
1077                event: ExecutionEventInner::Locked { .. },
1078                created_at: at,
1079                backtrace_id: None,
1080            } if *at == sim_clock.now()
1081        );
1082        assert_matches!(
1083            execution_log.events.get(4).unwrap(),
1084            ExecutionEvent {
1085                event: ExecutionEventInner::Finished {
1086                    result: SupportedFunctionReturnValue::Ok{ok:None},
1087                    http_client_traces: None
1088                },
1089                created_at: finished_at,
1090                backtrace_id: None,
1091            } if *finished_at == sim_clock.now()
1092        );
1093        db_pool.close().await.unwrap();
1094    }
1095
1096    #[tokio::test]
1097    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1098        set_up();
1099        let created_at = Now.now();
1100        let clock_fn = ConstClock(created_at);
1101        let (_guard, db_pool) = Database::Memory.set_up().await;
1102        let exec_config = ExecConfig {
1103            batch_size: 1,
1104            lock_expiry: Duration::from_secs(1),
1105            tick_sleep: Duration::ZERO,
1106            component_id: ComponentId::dummy_activity(),
1107            task_limiter: None,
1108            executor_id: ExecutorId::generate(),
1109        };
1110
1111        let expected_reason = "error reason";
1112        let expected_detail = "error detail";
1113        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1114            WorkerError::ActivityTrap {
1115                reason: expected_reason.to_string(),
1116                trap_kind: concepts::TrapKind::Trap,
1117                detail: Some(expected_detail.to_string()),
1118                version: Version::new(2),
1119                http_client_traces: None,
1120            },
1121        )));
1122        let execution_log = create_and_tick(
1123            CreateAndTickConfig {
1124                execution_id: ExecutionId::generate(),
1125                created_at,
1126                max_retries: 0,
1127                executed_at: created_at,
1128                retry_exp_backoff: Duration::ZERO,
1129            },
1130            clock_fn,
1131            db_pool.clone(),
1132            exec_config.clone(),
1133            worker,
1134            tick_fn,
1135        )
1136        .await;
1137        assert_eq!(3, execution_log.events.len());
1138        let (reason, kind, detail) = assert_matches!(
1139            &execution_log.events.get(2).unwrap(),
1140            ExecutionEvent {
1141                event: ExecutionEventInner::Finished{
1142                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1143                    http_client_traces: None
1144                },
1145                created_at: at,
1146                backtrace_id: None,
1147            } if *at == created_at
1148            => (reason_inner, kind, detail)
1149        );
1150        assert_eq!(expected_reason, *reason);
1151        assert_eq!(Some(expected_detail), detail.as_deref());
1152        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1153
1154        db_pool.close().await.unwrap();
1155    }
1156
1157    #[tokio::test]
1158    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1159        let worker_error = WorkerError::ActivityTrap {
1160            reason: "error reason".to_string(),
1161            trap_kind: TrapKind::Trap,
1162            detail: Some("detail".to_string()),
1163            version: Version::new(2),
1164            http_client_traces: None,
1165        };
1166        let expected_child_err = FinishedExecutionError::PermanentFailure {
1167            reason_full: "activity trap: error reason".to_string(),
1168            reason_inner: "error reason".to_string(),
1169            kind: PermanentFailureKind::ActivityTrap,
1170            detail: Some("detail".to_string()),
1171        };
1172        child_execution_permanently_failed_should_notify_parent(
1173            WorkerResult::Err(worker_error),
1174            expected_child_err,
1175        )
1176        .await;
1177    }
1178
1179    // TODO: Add test for WorkerError::TemporaryTimeout
1180
1181    #[tokio::test]
1182    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1183        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1184        child_execution_permanently_failed_should_notify_parent(
1185            WorkerResult::DbUpdatedByWorkerOrWatcher,
1186            expected_child_err,
1187        )
1188        .await;
1189    }
1190
1191    async fn child_execution_permanently_failed_should_notify_parent(
1192        worker_result: WorkerResult,
1193        expected_child_err: FinishedExecutionError,
1194    ) {
1195        use concepts::storage::JoinSetResponseEventOuter;
1196        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1197
1198        set_up();
1199        let sim_clock = SimClock::default();
1200        let (_guard, db_pool) = Database::Memory.set_up().await;
1201
1202        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1203            WorkerResult::DbUpdatedByWorkerOrWatcher,
1204        ));
1205        let parent_execution_id = ExecutionId::generate();
1206        db_pool
1207            .connection()
1208            .create(CreateRequest {
1209                created_at: sim_clock.now(),
1210                execution_id: parent_execution_id.clone(),
1211                ffqn: FFQN_SOME,
1212                params: Params::empty(),
1213                parent: None,
1214                metadata: concepts::ExecutionMetadata::empty(),
1215                scheduled_at: sim_clock.now(),
1216                retry_exp_backoff: Duration::ZERO,
1217                max_retries: 0,
1218                component_id: ComponentId::dummy_activity(),
1219                scheduled_by: None,
1220            })
1221            .await
1222            .unwrap();
1223        tick_fn(
1224            ExecConfig {
1225                batch_size: 1,
1226                lock_expiry: LOCK_EXPIRY,
1227                tick_sleep: Duration::ZERO,
1228                component_id: ComponentId::dummy_activity(),
1229                task_limiter: None,
1230                executor_id: ExecutorId::generate(),
1231            },
1232            sim_clock.clone(),
1233            db_pool.clone(),
1234            parent_worker,
1235            sim_clock.now(),
1236        )
1237        .await;
1238
1239        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1240        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1241        // executor does not append anything, this should have been written by the worker:
1242        {
1243            let child = CreateRequest {
1244                created_at: sim_clock.now(),
1245                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1246                ffqn: FFQN_CHILD,
1247                params: Params::empty(),
1248                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1249                metadata: concepts::ExecutionMetadata::empty(),
1250                scheduled_at: sim_clock.now(),
1251                retry_exp_backoff: Duration::ZERO,
1252                max_retries: 0,
1253                component_id: ComponentId::dummy_activity(),
1254                scheduled_by: None,
1255            };
1256            let current_time = sim_clock.now();
1257            let join_set = AppendRequest {
1258                created_at: current_time,
1259                event: ExecutionEventInner::HistoryEvent {
1260                    event: HistoryEvent::JoinSetCreate {
1261                        join_set_id: join_set_id.clone(),
1262                        closing_strategy: ClosingStrategy::Complete,
1263                    },
1264                },
1265            };
1266            let child_exec_req = AppendRequest {
1267                created_at: current_time,
1268                event: ExecutionEventInner::HistoryEvent {
1269                    event: HistoryEvent::JoinSetRequest {
1270                        join_set_id: join_set_id.clone(),
1271                        request: JoinSetRequest::ChildExecutionRequest {
1272                            child_execution_id: child_execution_id.clone(),
1273                        },
1274                    },
1275                },
1276            };
1277            let join_next = AppendRequest {
1278                created_at: current_time,
1279                event: ExecutionEventInner::HistoryEvent {
1280                    event: HistoryEvent::JoinNext {
1281                        join_set_id: join_set_id.clone(),
1282                        run_expires_at: sim_clock.now(),
1283                        closing: false,
1284                        requested_ffqn: Some(FFQN_CHILD),
1285                    },
1286                },
1287            };
1288            db_pool
1289                .connection()
1290                .append_batch_create_new_execution(
1291                    current_time,
1292                    vec![join_set, child_exec_req, join_next],
1293                    parent_execution_id.clone(),
1294                    Version::new(2),
1295                    vec![child],
1296                )
1297                .await
1298                .unwrap();
1299        }
1300
1301        let child_worker =
1302            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1303
1304        // execute the child
1305        tick_fn(
1306            ExecConfig {
1307                batch_size: 1,
1308                lock_expiry: LOCK_EXPIRY,
1309                tick_sleep: Duration::ZERO,
1310                component_id: ComponentId::dummy_activity(),
1311                task_limiter: None,
1312                executor_id: ExecutorId::generate(),
1313            },
1314            sim_clock.clone(),
1315            db_pool.clone(),
1316            child_worker,
1317            sim_clock.now(),
1318        )
1319        .await;
1320        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1321            // In case of timeout, let the timers watcher handle it
1322            sim_clock.move_time_forward(LOCK_EXPIRY);
1323            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1324                .await
1325                .unwrap();
1326        }
1327        let child_log = db_pool
1328            .connection()
1329            .get(&ExecutionId::Derived(child_execution_id.clone()))
1330            .await
1331            .unwrap();
1332        assert!(child_log.pending_state.is_finished());
1333        assert_eq!(
1334            Version(2),
1335            child_log.next_version,
1336            "created = 0, locked = 1, with_single_result = 2"
1337        );
1338        assert_eq!(
1339            ExecutionEventInner::Finished {
1340                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1341                http_client_traces: None
1342            },
1343            child_log.last_event().event
1344        );
1345        let parent_log = db_pool
1346            .connection()
1347            .get(&parent_execution_id)
1348            .await
1349            .unwrap();
1350        assert_matches!(
1351            parent_log.pending_state,
1352            PendingState::PendingAt {
1353                scheduled_at
1354            } if scheduled_at == sim_clock.now(),
1355            "parent should be back to pending"
1356        );
1357        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1358            parent_log.responses.last(),
1359            Some(JoinSetResponseEventOuter{
1360                created_at: at,
1361                event: JoinSetResponseEvent{
1362                    join_set_id: found_join_set_id,
1363                    event: JoinSetResponse::ChildExecutionFinished {
1364                        child_execution_id: found_child_execution_id,
1365                        finished_version,
1366                        result: found_result,
1367                    }
1368                }
1369            })
1370             if *at == sim_clock.now()
1371            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1372        );
1373        assert_eq!(join_set_id, *found_join_set_id);
1374        assert_eq!(child_execution_id, *found_child_execution_id);
1375        assert_eq!(child_log.next_version, *child_finished_version);
1376        assert_matches!(
1377            found_result,
1378            SupportedFunctionReturnValue::ExecutionError(_)
1379        );
1380
1381        db_pool.close().await.unwrap();
1382    }
1383
1384    #[derive(Clone, Debug)]
1385    struct SleepyWorker {
1386        duration: Duration,
1387        result: SupportedFunctionReturnValue,
1388        exported: [FunctionMetadata; 1],
1389    }
1390
1391    #[async_trait]
1392    impl Worker for SleepyWorker {
1393        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1394            tokio::time::sleep(self.duration).await;
1395            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1396        }
1397
1398        fn exported_functions(&self) -> &[FunctionMetadata] {
1399            &self.exported
1400        }
1401    }
1402
1403    #[tokio::test]
1404    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1405        set_up();
1406        let sim_clock = SimClock::default();
1407        let (_guard, db_pool) = Database::Memory.set_up().await;
1408        let lock_expiry = Duration::from_millis(100);
1409        let exec_config = ExecConfig {
1410            batch_size: 1,
1411            lock_expiry,
1412            tick_sleep: Duration::ZERO,
1413            component_id: ComponentId::dummy_activity(),
1414            task_limiter: None,
1415            executor_id: ExecutorId::generate(),
1416        };
1417
1418        let worker = Arc::new(SleepyWorker {
1419            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1420            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1421            exported: [FunctionMetadata {
1422                ffqn: FFQN_SOME,
1423                parameter_types: ParameterTypes::default(),
1424                return_type: RETURN_TYPE_DUMMY,
1425                extension: None,
1426                submittable: true,
1427            }],
1428        });
1429        // Create an execution
1430        let execution_id = ExecutionId::generate();
1431        let timeout_duration = Duration::from_millis(300);
1432        let db_connection = db_pool.connection();
1433        db_connection
1434            .create(CreateRequest {
1435                created_at: sim_clock.now(),
1436                execution_id: execution_id.clone(),
1437                ffqn: FFQN_SOME,
1438                params: Params::empty(),
1439                parent: None,
1440                metadata: concepts::ExecutionMetadata::empty(),
1441                scheduled_at: sim_clock.now(),
1442                retry_exp_backoff: timeout_duration,
1443                max_retries: 1,
1444                component_id: ComponentId::dummy_activity(),
1445                scheduled_by: None,
1446            })
1447            .await
1448            .unwrap();
1449
1450        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1451        let executor = ExecTask::new(
1452            worker,
1453            exec_config,
1454            sim_clock.clone(),
1455            db_pool.clone(),
1456            ffqns,
1457        );
1458        let mut first_execution_progress = executor
1459            .tick(sim_clock.now(), RunId::generate())
1460            .await
1461            .unwrap();
1462        assert_eq!(1, first_execution_progress.executions.len());
1463        // Started hanging, wait for lock expiry.
1464        sim_clock.move_time_forward(lock_expiry);
1465        // cleanup should be called
1466        let now_after_first_lock_expiry = sim_clock.now();
1467        {
1468            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1469            let cleanup_progress = executor
1470                .tick(now_after_first_lock_expiry, RunId::generate())
1471                .await
1472                .unwrap();
1473            assert!(cleanup_progress.executions.is_empty());
1474        }
1475        {
1476            let expired_locks = expired_timers_watcher::tick(
1477                db_pool.connection().as_ref(),
1478                now_after_first_lock_expiry,
1479            )
1480            .await
1481            .unwrap()
1482            .expired_locks;
1483            assert_eq!(1, expired_locks);
1484        }
1485        assert!(
1486            !first_execution_progress
1487                .executions
1488                .pop()
1489                .unwrap()
1490                .1
1491                .is_finished()
1492        );
1493
1494        let execution_log = db_connection.get(&execution_id).await.unwrap();
1495        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1496        assert_matches!(
1497            &execution_log.events.get(2).unwrap(),
1498            ExecutionEvent {
1499                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1500                created_at: at,
1501                backtrace_id: None,
1502            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1503        );
1504        assert_eq!(
1505            PendingState::PendingAt {
1506                scheduled_at: expected_first_timeout_expiry
1507            },
1508            execution_log.pending_state
1509        );
1510        sim_clock.move_time_forward(timeout_duration);
1511        let now_after_first_timeout = sim_clock.now();
1512        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1513
1514        let mut second_execution_progress = executor
1515            .tick(now_after_first_timeout, RunId::generate())
1516            .await
1517            .unwrap();
1518        assert_eq!(1, second_execution_progress.executions.len());
1519
1520        // Started hanging, wait for lock expiry.
1521        sim_clock.move_time_forward(lock_expiry);
1522        // cleanup should be called
1523        let now_after_second_lock_expiry = sim_clock.now();
1524        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1525        {
1526            let cleanup_progress = executor
1527                .tick(now_after_second_lock_expiry, RunId::generate())
1528                .await
1529                .unwrap();
1530            assert!(cleanup_progress.executions.is_empty());
1531        }
1532        {
1533            let expired_locks = expired_timers_watcher::tick(
1534                db_pool.connection().as_ref(),
1535                now_after_second_lock_expiry,
1536            )
1537            .await
1538            .unwrap()
1539            .expired_locks;
1540            assert_eq!(1, expired_locks);
1541        }
1542        assert!(
1543            !second_execution_progress
1544                .executions
1545                .pop()
1546                .unwrap()
1547                .1
1548                .is_finished()
1549        );
1550
1551        drop(db_connection);
1552        drop(executor);
1553        db_pool.close().await.unwrap();
1554    }
1555}