obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendRequest, DbErrorGeneric, DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent,
7    JoinSetResponseEventOuter, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{ComponentId, FunctionMetadata, StrVariant, SupportedFunctionReturnValue};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13    FinishedExecutionError,
14    storage::{ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30    pub lock_expiry: Duration,
31    pub tick_sleep: Duration,
32    pub batch_size: u32,
33    pub component_id: ComponentId,
34    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35    pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39    worker: Arc<dyn Worker>,
40    config: ExecConfig,
41    clock_fn: C, // Used for obtaining current time when the execution finishes.
42    db_exec: Arc<dyn DbExecutor>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
56        let mut vec = Vec::new();
57        for (exe, join_handle) in self.executions {
58            vec.push(exe);
59            join_handle.await.unwrap();
60        }
61        vec
62    }
63}
64
65#[derive(derive_more::Debug)]
66pub struct ExecutorTaskHandle {
67    #[debug(skip)]
68    is_closing: Arc<AtomicBool>,
69    #[debug(skip)]
70    abort_handle: AbortHandle,
71    component_id: ComponentId,
72    executor_id: ExecutorId,
73}
74
75impl ExecutorTaskHandle {
76    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
77    pub async fn close(&self) {
78        trace!("Gracefully closing");
79        self.is_closing.store(true, Ordering::Relaxed);
80        while !self.abort_handle.is_finished() {
81            tokio::time::sleep(Duration::from_millis(1)).await;
82        }
83        debug!("Gracefully closed");
84    }
85}
86
87impl Drop for ExecutorTaskHandle {
88    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
89    fn drop(&mut self) {
90        if self.abort_handle.is_finished() {
91            return;
92        }
93        warn!("Aborting the executor task");
94        self.abort_handle.abort();
95    }
96}
97
98#[cfg(feature = "test")]
99pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
100    extract_exported_ffqns_noext(worker)
101}
102
103fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
104    worker
105        .exported_functions()
106        .iter()
107        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
108        .collect::<Arc<_>>()
109}
110
111impl<C: ClockFn + 'static> ExecTask<C> {
112    #[cfg(feature = "test")]
113    pub fn new(
114        worker: Arc<dyn Worker>,
115        config: ExecConfig,
116        clock_fn: C,
117        db_exec: Arc<dyn DbExecutor>,
118        ffqns: Arc<[FunctionFqn]>,
119    ) -> Self {
120        Self {
121            worker,
122            config,
123            clock_fn,
124            db_exec,
125            ffqns,
126        }
127    }
128
129    #[cfg(feature = "test")]
130    pub fn new_all_ffqns_test(
131        worker: Arc<dyn Worker>,
132        config: ExecConfig,
133        clock_fn: C,
134        db_exec: Arc<dyn DbExecutor>,
135    ) -> Self {
136        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137        Self {
138            worker,
139            config,
140            clock_fn,
141            db_exec,
142            ffqns,
143        }
144    }
145
146    pub fn spawn_new(
147        worker: Arc<dyn Worker>,
148        config: ExecConfig,
149        clock_fn: C,
150        db_exec: Arc<dyn DbExecutor>,
151        sleep: impl Sleep + 'static,
152    ) -> ExecutorTaskHandle {
153        let is_closing = Arc::new(AtomicBool::default());
154        let is_closing_inner = is_closing.clone();
155        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
156        let component_id = config.component_id.clone();
157        let executor_id = config.executor_id;
158        let abort_handle = tokio::spawn(async move {
159            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
160            let task = ExecTask {
161                worker,
162                config,
163                db_exec,
164                ffqns: ffqns.clone(),
165                clock_fn: clock_fn.clone(),
166            };
167            while !is_closing_inner.load(Ordering::Relaxed) {
168                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
169
170                task.db_exec
171                    .wait_for_pending(clock_fn.now(), ffqns.clone(), {
172                        let sleep = sleep.clone();
173                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
174                    .await;
175            }
176        })
177        .abort_handle();
178        ExecutorTaskHandle {
179            is_closing,
180            abort_handle,
181            component_id,
182            executor_id,
183        }
184    }
185
186    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
187        if let Some(task_limiter) = &self.config.task_limiter {
188            let mut locks = Vec::new();
189            for _ in 0..self.config.batch_size {
190                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
191                    locks.push(Some(permit));
192                } else {
193                    break;
194                }
195            }
196            locks
197        } else {
198            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
199            for _ in 0..self.config.batch_size {
200                vec.push(None);
201            }
202            vec
203        }
204    }
205
206    #[cfg(feature = "test")]
207    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
208        self.tick(executed_at, run_id).await.unwrap()
209    }
210
211    #[cfg(feature = "test")]
212    pub async fn tick_test_await(
213        &self,
214        executed_at: DateTime<Utc>,
215        run_id: RunId,
216    ) -> Vec<ExecutionId> {
217        self.tick(executed_at, run_id)
218            .await
219            .unwrap()
220            .wait_for_tasks()
221            .await
222    }
223
224    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
225    async fn tick(
226        &self,
227        executed_at: DateTime<Utc>,
228        run_id: RunId,
229    ) -> Result<ExecutionProgress, DbErrorGeneric> {
230        let locked_executions = {
231            let mut permits = self.acquire_task_permits();
232            if permits.is_empty() {
233                return Ok(ExecutionProgress::default());
234            }
235            let lock_expires_at = executed_at + self.config.lock_expiry;
236            let locked_executions = self
237                .db_exec
238                .lock_pending(
239                    permits.len(), // batch size
240                    executed_at,   // fetch expiring before now
241                    self.ffqns.clone(),
242                    executed_at, // created at
243                    self.config.component_id.clone(),
244                    self.config.executor_id,
245                    lock_expires_at,
246                    run_id,
247                )
248                .await?;
249            // Drop permits if too many were allocated.
250            while permits.len() > locked_executions.len() {
251                permits.pop();
252            }
253            assert_eq!(permits.len(), locked_executions.len());
254            locked_executions.into_iter().zip(permits)
255        };
256        let execution_deadline = executed_at + self.config.lock_expiry;
257
258        let mut executions = Vec::with_capacity(locked_executions.len());
259        for (locked_execution, permit) in locked_executions {
260            let execution_id = locked_execution.execution_id.clone();
261            let join_handle = {
262                let worker = self.worker.clone();
263                let db = self.db_exec.clone();
264                let clock_fn = self.clock_fn.clone();
265                let run_id = locked_execution.run_id;
266                let worker_span = info_span!(parent: None, "worker",
267                    "otel.name" = format!("worker {}", locked_execution.ffqn),
268                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
269                locked_execution.metadata.enrich(&worker_span);
270                tokio::spawn({
271                    let worker_span2 = worker_span.clone();
272                    async move {
273                        let _permit = permit;
274                        let res = Self::run_worker(
275                            worker,
276                            db.as_ref(),
277                            execution_deadline,
278                            clock_fn,
279                            locked_execution,
280                            worker_span2,
281                        )
282                        .await;
283                        if let Err(db_error) = res {
284                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
285                        }
286                    }
287                    .instrument(worker_span)
288                })
289            };
290            executions.push((execution_id, join_handle));
291        }
292        Ok(ExecutionProgress { executions })
293    }
294
295    async fn run_worker(
296        worker: Arc<dyn Worker>,
297        db_exec: &dyn DbExecutor,
298        execution_deadline: DateTime<Utc>,
299        clock_fn: C,
300        locked_execution: LockedExecution,
301        worker_span: Span,
302    ) -> Result<(), DbErrorWrite> {
303        debug!("Worker::run starting");
304        trace!(
305            version = %locked_execution.next_version,
306            params = ?locked_execution.params,
307            event_history = ?locked_execution.event_history,
308            "Worker::run starting"
309        );
310        let can_be_retried = ExecutionLog::can_be_retried_after(
311            locked_execution.intermittent_event_count + 1,
312            locked_execution.max_retries,
313            locked_execution.retry_exp_backoff,
314        );
315        let unlock_expiry_on_limit_reached =
316            ExecutionLog::compute_retry_duration_when_retrying_forever(
317                locked_execution.intermittent_event_count + 1,
318                locked_execution.retry_exp_backoff,
319            );
320        let ctx = WorkerContext {
321            execution_id: locked_execution.execution_id.clone(),
322            metadata: locked_execution.metadata,
323            ffqn: locked_execution.ffqn,
324            params: locked_execution.params,
325            event_history: locked_execution.event_history,
326            responses: locked_execution
327                .responses
328                .into_iter()
329                .map(|outer| outer.event)
330                .collect(),
331            version: locked_execution.next_version,
332            execution_deadline,
333            can_be_retried: can_be_retried.is_some(),
334            run_id: locked_execution.run_id,
335            worker_span,
336        };
337        let worker_result = worker.run(ctx).await;
338        trace!(?worker_result, "Worker::run finished");
339        let result_obtained_at = clock_fn.now();
340        match Self::worker_result_to_execution_event(
341            locked_execution.execution_id,
342            worker_result,
343            result_obtained_at,
344            locked_execution.parent,
345            can_be_retried,
346            unlock_expiry_on_limit_reached,
347        )? {
348            Some(append) => {
349                trace!("Appending {append:?}");
350                append.append(db_exec).await
351            }
352            None => Ok(()),
353        }
354    }
355
356    /// Map the `WorkerError` to an temporary or a permanent failure.
357    fn worker_result_to_execution_event(
358        execution_id: ExecutionId,
359        worker_result: WorkerResult,
360        result_obtained_at: DateTime<Utc>,
361        parent: Option<(ExecutionId, JoinSetId)>,
362        can_be_retried: Option<Duration>,
363        unlock_expiry_on_limit_reached: Duration,
364    ) -> Result<Option<Append>, DbErrorWrite> {
365        Ok(match worker_result {
366            WorkerResult::Ok(result, new_version, http_client_traces) => {
367                info!(
368                    "Execution finished: {}",
369                    result.as_pending_state_finished_result()
370                );
371                let child_finished =
372                    parent.map(
373                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
374                            parent_execution_id,
375                            parent_join_set,
376                            result: result.clone(),
377                        },
378                    );
379                let primary_event = AppendRequest {
380                    created_at: result_obtained_at,
381                    event: ExecutionEventInner::Finished {
382                        result,
383                        http_client_traces,
384                    },
385                };
386
387                Some(Append {
388                    created_at: result_obtained_at,
389                    primary_event,
390                    execution_id,
391                    version: new_version,
392                    child_finished,
393                })
394            }
395            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
396            WorkerResult::Err(err) => {
397                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
398                let (primary_event, child_finished, version) = match err {
399                    WorkerError::TemporaryTimeout {
400                        http_client_traces,
401                        version,
402                    } => {
403                        if let Some(duration) = can_be_retried {
404                            let backoff_expires_at = result_obtained_at + duration;
405                            info!(
406                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
407                            );
408                            (
409                                ExecutionEventInner::TemporarilyTimedOut {
410                                    backoff_expires_at,
411                                    http_client_traces,
412                                },
413                                None,
414                                version,
415                            )
416                        } else {
417                            info!("Permanent timeout");
418                            let result = SupportedFunctionReturnValue::ExecutionError(
419                                FinishedExecutionError::PermanentTimeout,
420                            );
421                            let child_finished =
422                                parent.map(|(parent_execution_id, parent_join_set)| {
423                                    ChildFinishedResponse {
424                                        parent_execution_id,
425                                        parent_join_set,
426                                        result: result.clone(),
427                                    }
428                                });
429                            (
430                                ExecutionEventInner::Finished {
431                                    result,
432                                    http_client_traces,
433                                },
434                                child_finished,
435                                version,
436                            )
437                        }
438                    }
439                    WorkerError::DbError(db_error) => {
440                        return Err(db_error);
441                    }
442                    WorkerError::ActivityTrap {
443                        reason: reason_inner,
444                        trap_kind,
445                        detail,
446                        version,
447                        http_client_traces,
448                    } => retry_or_fail(
449                        result_obtained_at,
450                        parent,
451                        can_be_retried,
452                        reason_full,
453                        reason_inner,
454                        trap_kind, // short reason like `trap` for logs
455                        detail,
456                        version,
457                        http_client_traces,
458                    ),
459                    WorkerError::ActivityPreopenedDirError {
460                        reason_kind,
461                        reason_inner,
462                        version,
463                    } => retry_or_fail(
464                        result_obtained_at,
465                        parent,
466                        can_be_retried,
467                        reason_full,
468                        reason_inner,
469                        reason_kind, // short reason for logs
470                        None,        // detail
471                        version,
472                        None, // http_client_traces
473                    ),
474                    WorkerError::ActivityReturnedError {
475                        detail,
476                        version,
477                        http_client_traces,
478                    } => {
479                        let duration = can_be_retried.expect(
480                            "ActivityReturnedError must not be returned when retries are exhausted",
481                        );
482                        let expires_at = result_obtained_at + duration;
483                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
484                        (
485                            ExecutionEventInner::TemporarilyFailed {
486                                backoff_expires_at: expires_at,
487                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
488                                reason_inner: StrVariant::Static("activity returned error"),
489                                detail, // contains the backtrace
490                                http_client_traces,
491                            },
492                            None,
493                            version,
494                        )
495                    }
496                    WorkerError::LimitReached {
497                        reason: inner_reason,
498                        version: new_version,
499                    } => {
500                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
501                        warn!(
502                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
503                        );
504                        (
505                            ExecutionEventInner::Unlocked {
506                                backoff_expires_at: expires_at,
507                                reason: StrVariant::from(reason_full),
508                            },
509                            None,
510                            new_version,
511                        )
512                    }
513                    WorkerError::FatalError(fatal_error, version) => {
514                        info!("Fatal worker error - {fatal_error:?}");
515                        let result = SupportedFunctionReturnValue::ExecutionError(
516                            FinishedExecutionError::from(fatal_error),
517                        );
518                        let child_finished =
519                            parent.map(|(parent_execution_id, parent_join_set)| {
520                                ChildFinishedResponse {
521                                    parent_execution_id,
522                                    parent_join_set,
523                                    result: result.clone(),
524                                }
525                            });
526                        (
527                            ExecutionEventInner::Finished {
528                                result,
529                                http_client_traces: None,
530                            },
531                            child_finished,
532                            version,
533                        )
534                    }
535                };
536                Some(Append {
537                    created_at: result_obtained_at,
538                    primary_event: AppendRequest {
539                        created_at: result_obtained_at,
540                        event: primary_event,
541                    },
542                    execution_id,
543                    version,
544                    child_finished,
545                })
546            }
547        })
548    }
549}
550
551#[expect(clippy::too_many_arguments)]
552fn retry_or_fail(
553    result_obtained_at: DateTime<Utc>,
554    parent: Option<(ExecutionId, JoinSetId)>,
555    can_be_retried: Option<Duration>,
556    reason_full: String,
557    reason_inner: String,
558    err_kind_for_logs: impl Display,
559    detail: Option<String>,
560    version: Version,
561    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
562) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
563    if let Some(duration) = can_be_retried {
564        let expires_at = result_obtained_at + duration;
565        debug!(
566            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
567        );
568        (
569            ExecutionEventInner::TemporarilyFailed {
570                backoff_expires_at: expires_at,
571                reason_full: StrVariant::from(reason_full),
572                reason_inner: StrVariant::from(reason_inner),
573                detail,
574                http_client_traces,
575            },
576            None,
577            version,
578        )
579    } else {
580        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
581        let result = SupportedFunctionReturnValue::ExecutionError(
582            FinishedExecutionError::PermanentFailure {
583                reason_inner,
584                reason_full,
585                kind: PermanentFailureKind::ActivityTrap,
586                detail,
587            },
588        );
589
590        let child_finished =
591            parent.map(
592                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
593                    parent_execution_id,
594                    parent_join_set,
595                    result: result.clone(),
596                },
597            );
598        (
599            ExecutionEventInner::Finished {
600                result,
601                http_client_traces,
602            },
603            child_finished,
604            version,
605        )
606    }
607}
608
609#[derive(Debug, Clone)]
610pub(crate) struct ChildFinishedResponse {
611    pub(crate) parent_execution_id: ExecutionId,
612    pub(crate) parent_join_set: JoinSetId,
613    pub(crate) result: SupportedFunctionReturnValue,
614}
615
616#[derive(Debug, Clone)]
617pub(crate) struct Append {
618    pub(crate) created_at: DateTime<Utc>,
619    pub(crate) primary_event: AppendRequest,
620    pub(crate) execution_id: ExecutionId,
621    pub(crate) version: Version,
622    pub(crate) child_finished: Option<ChildFinishedResponse>,
623}
624
625impl Append {
626    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
627        if let Some(child_finished) = self.child_finished {
628            assert_matches!(
629                &self.primary_event,
630                AppendRequest {
631                    event: ExecutionEventInner::Finished { .. },
632                    ..
633                }
634            );
635            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
636            db_exec
637                .append_batch_respond_to_parent(
638                    derived.clone(),
639                    self.created_at,
640                    vec![self.primary_event],
641                    self.version.clone(),
642                    child_finished.parent_execution_id,
643                    JoinSetResponseEventOuter {
644                        created_at: self.created_at,
645                        event: JoinSetResponseEvent {
646                            join_set_id: child_finished.parent_join_set,
647                            event: JoinSetResponse::ChildExecutionFinished {
648                                child_execution_id: derived,
649                                // Since self.primary_event is a finished event, the version will remain the same.
650                                finished_version: self.version,
651                                result: child_finished.result,
652                            },
653                        },
654                    },
655                )
656                .await?;
657        } else {
658            db_exec
659                .append(self.execution_id, self.version, self.primary_event)
660                .await?;
661        }
662        Ok(())
663    }
664}
665
666#[cfg(any(test, feature = "test"))]
667pub mod simple_worker {
668    use crate::worker::{Worker, WorkerContext, WorkerResult};
669    use async_trait::async_trait;
670    use concepts::{
671        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
672        storage::{HistoryEvent, Version},
673    };
674    use indexmap::IndexMap;
675    use std::sync::Arc;
676    use tracing::trace;
677
678    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
679    pub type SimpleWorkerResultMap =
680        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
681
682    #[derive(Clone, Debug)]
683    pub struct SimpleWorker {
684        pub worker_results_rev: SimpleWorkerResultMap,
685        pub ffqn: FunctionFqn,
686        exported: [FunctionMetadata; 1],
687    }
688
689    impl SimpleWorker {
690        #[must_use]
691        pub fn with_single_result(res: WorkerResult) -> Self {
692            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
693                Version::new(2),
694                (vec![], res),
695            )]))))
696        }
697
698        #[must_use]
699        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
700            Self {
701                worker_results_rev: self.worker_results_rev,
702                exported: [FunctionMetadata {
703                    ffqn: ffqn.clone(),
704                    parameter_types: ParameterTypes::default(),
705                    return_type: RETURN_TYPE_DUMMY,
706                    extension: None,
707                    submittable: true,
708                }],
709                ffqn,
710            }
711        }
712
713        #[must_use]
714        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
715            Self {
716                worker_results_rev,
717                ffqn: FFQN_SOME,
718                exported: [FunctionMetadata {
719                    ffqn: FFQN_SOME,
720                    parameter_types: ParameterTypes::default(),
721                    return_type: RETURN_TYPE_DUMMY,
722                    extension: None,
723                    submittable: true,
724                }],
725            }
726        }
727    }
728
729    #[async_trait]
730    impl Worker for SimpleWorker {
731        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
732            let (expected_version, (expected_eh, worker_result)) =
733                self.worker_results_rev.lock().unwrap().pop().unwrap();
734            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
735            assert_eq!(expected_version, ctx.version);
736            assert_eq!(expected_eh, ctx.event_history);
737            worker_result
738        }
739
740        fn exported_functions(&self) -> &[FunctionMetadata] {
741            &self.exported
742        }
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use self::simple_worker::SimpleWorker;
749    use super::*;
750    use crate::{expired_timers_watcher, worker::WorkerResult};
751    use assert_matches::assert_matches;
752    use async_trait::async_trait;
753    use concepts::storage::DbPoolCloseable;
754    use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
755    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
756    use concepts::time::Now;
757    use concepts::{
758        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
759        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
760    };
761    use db_tests::Database;
762    use indexmap::IndexMap;
763    use simple_worker::FFQN_SOME;
764    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
765    use test_utils::set_up;
766    use test_utils::sim_clock::{ConstClock, SimClock};
767
768    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
769
770    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
771        config: ExecConfig,
772        clock_fn: C,
773        db_exec: Arc<dyn DbExecutor>,
774        worker: Arc<W>,
775        executed_at: DateTime<Utc>,
776    ) -> Vec<ExecutionId> {
777        trace!("Ticking with {worker:?}");
778        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
779        let executor = ExecTask::new(worker, config, clock_fn, db_exec, ffqns);
780        executor
781            .tick_test_await(executed_at, RunId::generate())
782            .await
783    }
784
785    #[tokio::test]
786    async fn execute_simple_lifecycle_tick_based_mem() {
787        let created_at = Now.now();
788        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
789        execute_simple_lifecycle_tick_based(
790            db_pool.connection().as_ref(),
791            db_exec.clone(),
792            ConstClock(created_at),
793        )
794        .await;
795        db_close.close().await;
796    }
797
798    #[tokio::test]
799    async fn execute_simple_lifecycle_tick_based_sqlite() {
800        let created_at = Now.now();
801        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
802        execute_simple_lifecycle_tick_based(
803            db_pool.connection().as_ref(),
804            db_exec.clone(),
805            ConstClock(created_at),
806        )
807        .await;
808        db_close.close().await;
809    }
810
811    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
812        db_connection: &dyn DbConnection,
813        db_exec: Arc<dyn DbExecutor>,
814        clock_fn: C,
815    ) {
816        set_up();
817        let created_at = clock_fn.now();
818        let exec_config = ExecConfig {
819            batch_size: 1,
820            lock_expiry: Duration::from_secs(1),
821            tick_sleep: Duration::from_millis(100),
822            component_id: ComponentId::dummy_activity(),
823            task_limiter: None,
824            executor_id: ExecutorId::generate(),
825        };
826
827        let execution_log = create_and_tick(
828            CreateAndTickConfig {
829                execution_id: ExecutionId::generate(),
830                created_at,
831                max_retries: 0,
832                executed_at: created_at,
833                retry_exp_backoff: Duration::ZERO,
834            },
835            clock_fn,
836            db_connection,
837            db_exec,
838            exec_config,
839            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
840                SUPPORTED_RETURN_VALUE_OK_EMPTY,
841                Version::new(2),
842                None,
843            ))),
844            tick_fn,
845        )
846        .await;
847        assert_matches!(
848            execution_log.events.get(2).unwrap(),
849            ExecutionEvent {
850                event: ExecutionEventInner::Finished {
851                    result: SupportedFunctionReturnValue::Ok { ok: None },
852                    http_client_traces: None
853                },
854                created_at: _,
855                backtrace_id: None,
856            }
857        );
858    }
859
860    #[tokio::test]
861    async fn execute_simple_lifecycle_task_based_mem() {
862        set_up();
863        let created_at = Now.now();
864        let clock_fn = ConstClock(created_at);
865        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
866        let exec_config = ExecConfig {
867            batch_size: 1,
868            lock_expiry: Duration::from_secs(1),
869            tick_sleep: Duration::ZERO,
870            component_id: ComponentId::dummy_activity(),
871            task_limiter: None,
872            executor_id: ExecutorId::generate(),
873        };
874
875        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
876            SUPPORTED_RETURN_VALUE_OK_EMPTY,
877            Version::new(2),
878            None,
879        )));
880
881        let execution_log = create_and_tick(
882            CreateAndTickConfig {
883                execution_id: ExecutionId::generate(),
884                created_at,
885                max_retries: 0,
886                executed_at: created_at,
887                retry_exp_backoff: Duration::ZERO,
888            },
889            clock_fn,
890            db_pool.connection().as_ref(),
891            db_exec,
892            exec_config,
893            worker,
894            tick_fn,
895        )
896        .await;
897        assert_matches!(
898            execution_log.events.get(2).unwrap(),
899            ExecutionEvent {
900                event: ExecutionEventInner::Finished {
901                    result: SupportedFunctionReturnValue::Ok { ok: None },
902                    http_client_traces: None
903                },
904                created_at: _,
905                backtrace_id: None,
906            }
907        );
908        db_close.close().await;
909    }
910
911    struct CreateAndTickConfig {
912        execution_id: ExecutionId,
913        created_at: DateTime<Utc>,
914        max_retries: u32,
915        executed_at: DateTime<Utc>,
916        retry_exp_backoff: Duration,
917    }
918
919    async fn create_and_tick<
920        W: Worker,
921        C: ClockFn,
922        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
923        F: Future<Output = Vec<ExecutionId>>,
924    >(
925        config: CreateAndTickConfig,
926        clock_fn: C,
927        db_connection: &dyn DbConnection,
928        db_exec: Arc<dyn DbExecutor>,
929        exec_config: ExecConfig,
930        worker: Arc<W>,
931        mut tick: T,
932    ) -> ExecutionLog {
933        // Create an execution
934        db_connection
935            .create(CreateRequest {
936                created_at: config.created_at,
937                execution_id: config.execution_id.clone(),
938                ffqn: FFQN_SOME,
939                params: Params::empty(),
940                parent: None,
941                metadata: concepts::ExecutionMetadata::empty(),
942                scheduled_at: config.created_at,
943                retry_exp_backoff: config.retry_exp_backoff,
944                max_retries: config.max_retries,
945                component_id: ComponentId::dummy_activity(),
946                scheduled_by: None,
947            })
948            .await
949            .unwrap();
950        // execute!
951        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
952        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
953        debug!("Execution history after tick: {execution_log:?}");
954        // check that DB contains Created and Locked events.
955        assert_matches!(
956            execution_log.events.first().unwrap(),
957            ExecutionEvent {
958                event: ExecutionEventInner::Created { .. },
959                created_at: actually_created_at,
960                backtrace_id: None,
961            }
962            if config.created_at == *actually_created_at
963        );
964        let locked_at = assert_matches!(
965            execution_log.events.get(1).unwrap(),
966            ExecutionEvent {
967                event: ExecutionEventInner::Locked { .. },
968                created_at: locked_at,
969                backtrace_id: None,
970            } if config.created_at <= *locked_at
971            => *locked_at
972        );
973        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
974            event: _,
975            created_at: executed_at,
976            backtrace_id: None,
977        } if *executed_at >= locked_at);
978        execution_log
979    }
980
981    #[tokio::test]
982    async fn activity_trap_should_trigger_an_execution_retry() {
983        set_up();
984        let sim_clock = SimClock::default();
985        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
986        let exec_config = ExecConfig {
987            batch_size: 1,
988            lock_expiry: Duration::from_secs(1),
989            tick_sleep: Duration::ZERO,
990            component_id: ComponentId::dummy_activity(),
991            task_limiter: None,
992            executor_id: ExecutorId::generate(),
993        };
994        let expected_reason = "error reason";
995        let expected_detail = "error detail";
996        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
997            WorkerError::ActivityTrap {
998                reason: expected_reason.to_string(),
999                trap_kind: concepts::TrapKind::Trap,
1000                detail: Some(expected_detail.to_string()),
1001                version: Version::new(2),
1002                http_client_traces: None,
1003            },
1004        )));
1005        let retry_exp_backoff = Duration::from_millis(100);
1006        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1007        let execution_log = create_and_tick(
1008            CreateAndTickConfig {
1009                execution_id: ExecutionId::generate(),
1010                created_at: sim_clock.now(),
1011                max_retries: 1,
1012                executed_at: sim_clock.now(),
1013                retry_exp_backoff,
1014            },
1015            sim_clock.clone(),
1016            db_pool.connection().as_ref(),
1017            db_exec.clone(),
1018            exec_config.clone(),
1019            worker,
1020            tick_fn,
1021        )
1022        .await;
1023        assert_eq!(3, execution_log.events.len());
1024        {
1025            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1026                &execution_log.events.get(2).unwrap(),
1027                ExecutionEvent {
1028                    event: ExecutionEventInner::TemporarilyFailed {
1029                        reason_inner,
1030                        reason_full,
1031                        detail,
1032                        backoff_expires_at,
1033                        http_client_traces: None,
1034                    },
1035                    created_at: at,
1036                    backtrace_id: None,
1037                }
1038                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1039            );
1040            assert_eq!(expected_reason, reason_inner.deref());
1041            assert_eq!(
1042                format!("activity trap: {expected_reason}"),
1043                reason_full.deref()
1044            );
1045            assert_eq!(Some(expected_detail), detail.as_deref());
1046            assert_eq!(at, sim_clock.now());
1047            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1048        }
1049        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1050            std::sync::Mutex::new(IndexMap::from([(
1051                Version::new(4),
1052                (
1053                    vec![],
1054                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1055                ),
1056            )])),
1057        )));
1058        // noop until `retry_exp_backoff` expires
1059        assert!(
1060            tick_fn(
1061                exec_config.clone(),
1062                sim_clock.clone(),
1063                db_exec.clone(),
1064                worker.clone(),
1065                sim_clock.now(),
1066            )
1067            .await
1068            .is_empty()
1069        );
1070        // tick again to finish the execution
1071        sim_clock.move_time_forward(retry_exp_backoff);
1072        tick_fn(
1073            exec_config,
1074            sim_clock.clone(),
1075            db_exec.clone(),
1076            worker,
1077            sim_clock.now(),
1078        )
1079        .await;
1080        let execution_log = {
1081            let db_connection = db_pool.connection();
1082            db_connection
1083                .get(&execution_log.execution_id)
1084                .await
1085                .unwrap()
1086        };
1087        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1088        assert_matches!(
1089            execution_log.events.get(3).unwrap(),
1090            ExecutionEvent {
1091                event: ExecutionEventInner::Locked { .. },
1092                created_at: at,
1093                backtrace_id: None,
1094            } if *at == sim_clock.now()
1095        );
1096        assert_matches!(
1097            execution_log.events.get(4).unwrap(),
1098            ExecutionEvent {
1099                event: ExecutionEventInner::Finished {
1100                    result: SupportedFunctionReturnValue::Ok{ok:None},
1101                    http_client_traces: None
1102                },
1103                created_at: finished_at,
1104                backtrace_id: None,
1105            } if *finished_at == sim_clock.now()
1106        );
1107        db_close.close().await;
1108    }
1109
1110    #[tokio::test]
1111    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1112        set_up();
1113        let created_at = Now.now();
1114        let clock_fn = ConstClock(created_at);
1115        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1116        let exec_config = ExecConfig {
1117            batch_size: 1,
1118            lock_expiry: Duration::from_secs(1),
1119            tick_sleep: Duration::ZERO,
1120            component_id: ComponentId::dummy_activity(),
1121            task_limiter: None,
1122            executor_id: ExecutorId::generate(),
1123        };
1124
1125        let expected_reason = "error reason";
1126        let expected_detail = "error detail";
1127        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1128            WorkerError::ActivityTrap {
1129                reason: expected_reason.to_string(),
1130                trap_kind: concepts::TrapKind::Trap,
1131                detail: Some(expected_detail.to_string()),
1132                version: Version::new(2),
1133                http_client_traces: None,
1134            },
1135        )));
1136        let execution_log = create_and_tick(
1137            CreateAndTickConfig {
1138                execution_id: ExecutionId::generate(),
1139                created_at,
1140                max_retries: 0,
1141                executed_at: created_at,
1142                retry_exp_backoff: Duration::ZERO,
1143            },
1144            clock_fn,
1145            db_pool.connection().as_ref(),
1146            db_exec,
1147            exec_config.clone(),
1148            worker,
1149            tick_fn,
1150        )
1151        .await;
1152        assert_eq!(3, execution_log.events.len());
1153        let (reason, kind, detail) = assert_matches!(
1154            &execution_log.events.get(2).unwrap(),
1155            ExecutionEvent {
1156                event: ExecutionEventInner::Finished{
1157                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1158                    http_client_traces: None
1159                },
1160                created_at: at,
1161                backtrace_id: None,
1162            } if *at == created_at
1163            => (reason_inner, kind, detail)
1164        );
1165        assert_eq!(expected_reason, *reason);
1166        assert_eq!(Some(expected_detail), detail.as_deref());
1167        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1168
1169        db_close.close().await;
1170    }
1171
1172    #[tokio::test]
1173    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1174        let worker_error = WorkerError::ActivityTrap {
1175            reason: "error reason".to_string(),
1176            trap_kind: TrapKind::Trap,
1177            detail: Some("detail".to_string()),
1178            version: Version::new(2),
1179            http_client_traces: None,
1180        };
1181        let expected_child_err = FinishedExecutionError::PermanentFailure {
1182            reason_full: "activity trap: error reason".to_string(),
1183            reason_inner: "error reason".to_string(),
1184            kind: PermanentFailureKind::ActivityTrap,
1185            detail: Some("detail".to_string()),
1186        };
1187        child_execution_permanently_failed_should_notify_parent(
1188            WorkerResult::Err(worker_error),
1189            expected_child_err,
1190        )
1191        .await;
1192    }
1193
1194    // TODO: Add test for WorkerError::TemporaryTimeout
1195
1196    #[tokio::test]
1197    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1198        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1199        child_execution_permanently_failed_should_notify_parent(
1200            WorkerResult::DbUpdatedByWorkerOrWatcher,
1201            expected_child_err,
1202        )
1203        .await;
1204    }
1205
1206    async fn child_execution_permanently_failed_should_notify_parent(
1207        worker_result: WorkerResult,
1208        expected_child_err: FinishedExecutionError,
1209    ) {
1210        use concepts::storage::JoinSetResponseEventOuter;
1211        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1212
1213        set_up();
1214        let sim_clock = SimClock::default();
1215        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1216
1217        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1218            WorkerResult::DbUpdatedByWorkerOrWatcher,
1219        ));
1220        let parent_execution_id = ExecutionId::generate();
1221        db_pool
1222            .connection()
1223            .create(CreateRequest {
1224                created_at: sim_clock.now(),
1225                execution_id: parent_execution_id.clone(),
1226                ffqn: FFQN_SOME,
1227                params: Params::empty(),
1228                parent: None,
1229                metadata: concepts::ExecutionMetadata::empty(),
1230                scheduled_at: sim_clock.now(),
1231                retry_exp_backoff: Duration::ZERO,
1232                max_retries: 0,
1233                component_id: ComponentId::dummy_activity(),
1234                scheduled_by: None,
1235            })
1236            .await
1237            .unwrap();
1238        tick_fn(
1239            ExecConfig {
1240                batch_size: 1,
1241                lock_expiry: LOCK_EXPIRY,
1242                tick_sleep: Duration::ZERO,
1243                component_id: ComponentId::dummy_activity(),
1244                task_limiter: None,
1245                executor_id: ExecutorId::generate(),
1246            },
1247            sim_clock.clone(),
1248            db_exec.clone(),
1249            parent_worker,
1250            sim_clock.now(),
1251        )
1252        .await;
1253
1254        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1255        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1256        // executor does not append anything, this should have been written by the worker:
1257        {
1258            let child = CreateRequest {
1259                created_at: sim_clock.now(),
1260                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1261                ffqn: FFQN_CHILD,
1262                params: Params::empty(),
1263                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1264                metadata: concepts::ExecutionMetadata::empty(),
1265                scheduled_at: sim_clock.now(),
1266                retry_exp_backoff: Duration::ZERO,
1267                max_retries: 0,
1268                component_id: ComponentId::dummy_activity(),
1269                scheduled_by: None,
1270            };
1271            let current_time = sim_clock.now();
1272            let join_set = AppendRequest {
1273                created_at: current_time,
1274                event: ExecutionEventInner::HistoryEvent {
1275                    event: HistoryEvent::JoinSetCreate {
1276                        join_set_id: join_set_id.clone(),
1277                        closing_strategy: ClosingStrategy::Complete,
1278                    },
1279                },
1280            };
1281            let child_exec_req = AppendRequest {
1282                created_at: current_time,
1283                event: ExecutionEventInner::HistoryEvent {
1284                    event: HistoryEvent::JoinSetRequest {
1285                        join_set_id: join_set_id.clone(),
1286                        request: JoinSetRequest::ChildExecutionRequest {
1287                            child_execution_id: child_execution_id.clone(),
1288                        },
1289                    },
1290                },
1291            };
1292            let join_next = AppendRequest {
1293                created_at: current_time,
1294                event: ExecutionEventInner::HistoryEvent {
1295                    event: HistoryEvent::JoinNext {
1296                        join_set_id: join_set_id.clone(),
1297                        run_expires_at: sim_clock.now(),
1298                        closing: false,
1299                        requested_ffqn: Some(FFQN_CHILD),
1300                    },
1301                },
1302            };
1303            db_pool
1304                .connection()
1305                .append_batch_create_new_execution(
1306                    current_time,
1307                    vec![join_set, child_exec_req, join_next],
1308                    parent_execution_id.clone(),
1309                    Version::new(2),
1310                    vec![child],
1311                )
1312                .await
1313                .unwrap();
1314        }
1315
1316        let child_worker =
1317            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1318
1319        // execute the child
1320        tick_fn(
1321            ExecConfig {
1322                batch_size: 1,
1323                lock_expiry: LOCK_EXPIRY,
1324                tick_sleep: Duration::ZERO,
1325                component_id: ComponentId::dummy_activity(),
1326                task_limiter: None,
1327                executor_id: ExecutorId::generate(),
1328            },
1329            sim_clock.clone(),
1330            db_exec.clone(),
1331            child_worker,
1332            sim_clock.now(),
1333        )
1334        .await;
1335        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1336            // In case of timeout, let the timers watcher handle it
1337            sim_clock.move_time_forward(LOCK_EXPIRY);
1338            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1339                .await
1340                .unwrap();
1341        }
1342        let child_log = db_pool
1343            .connection()
1344            .get(&ExecutionId::Derived(child_execution_id.clone()))
1345            .await
1346            .unwrap();
1347        assert!(child_log.pending_state.is_finished());
1348        assert_eq!(
1349            Version(2),
1350            child_log.next_version,
1351            "created = 0, locked = 1, with_single_result = 2"
1352        );
1353        assert_eq!(
1354            ExecutionEventInner::Finished {
1355                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1356                http_client_traces: None
1357            },
1358            child_log.last_event().event
1359        );
1360        let parent_log = db_pool
1361            .connection()
1362            .get(&parent_execution_id)
1363            .await
1364            .unwrap();
1365        assert_matches!(
1366            parent_log.pending_state,
1367            PendingState::PendingAt {
1368                scheduled_at
1369            } if scheduled_at == sim_clock.now(),
1370            "parent should be back to pending"
1371        );
1372        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1373            parent_log.responses.last(),
1374            Some(JoinSetResponseEventOuter{
1375                created_at: at,
1376                event: JoinSetResponseEvent{
1377                    join_set_id: found_join_set_id,
1378                    event: JoinSetResponse::ChildExecutionFinished {
1379                        child_execution_id: found_child_execution_id,
1380                        finished_version,
1381                        result: found_result,
1382                    }
1383                }
1384            })
1385             if *at == sim_clock.now()
1386            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1387        );
1388        assert_eq!(join_set_id, *found_join_set_id);
1389        assert_eq!(child_execution_id, *found_child_execution_id);
1390        assert_eq!(child_log.next_version, *child_finished_version);
1391        assert_matches!(
1392            found_result,
1393            SupportedFunctionReturnValue::ExecutionError(_)
1394        );
1395
1396        db_close.close().await;
1397    }
1398
1399    #[derive(Clone, Debug)]
1400    struct SleepyWorker {
1401        duration: Duration,
1402        result: SupportedFunctionReturnValue,
1403        exported: [FunctionMetadata; 1],
1404    }
1405
1406    #[async_trait]
1407    impl Worker for SleepyWorker {
1408        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1409            tokio::time::sleep(self.duration).await;
1410            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1411        }
1412
1413        fn exported_functions(&self) -> &[FunctionMetadata] {
1414            &self.exported
1415        }
1416    }
1417
1418    #[tokio::test]
1419    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1420        set_up();
1421        let sim_clock = SimClock::default();
1422        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1423        let lock_expiry = Duration::from_millis(100);
1424        let exec_config = ExecConfig {
1425            batch_size: 1,
1426            lock_expiry,
1427            tick_sleep: Duration::ZERO,
1428            component_id: ComponentId::dummy_activity(),
1429            task_limiter: None,
1430            executor_id: ExecutorId::generate(),
1431        };
1432
1433        let worker = Arc::new(SleepyWorker {
1434            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1435            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1436            exported: [FunctionMetadata {
1437                ffqn: FFQN_SOME,
1438                parameter_types: ParameterTypes::default(),
1439                return_type: RETURN_TYPE_DUMMY,
1440                extension: None,
1441                submittable: true,
1442            }],
1443        });
1444        // Create an execution
1445        let execution_id = ExecutionId::generate();
1446        let timeout_duration = Duration::from_millis(300);
1447        let db_connection = db_pool.connection();
1448        db_connection
1449            .create(CreateRequest {
1450                created_at: sim_clock.now(),
1451                execution_id: execution_id.clone(),
1452                ffqn: FFQN_SOME,
1453                params: Params::empty(),
1454                parent: None,
1455                metadata: concepts::ExecutionMetadata::empty(),
1456                scheduled_at: sim_clock.now(),
1457                retry_exp_backoff: timeout_duration,
1458                max_retries: 1,
1459                component_id: ComponentId::dummy_activity(),
1460                scheduled_by: None,
1461            })
1462            .await
1463            .unwrap();
1464
1465        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1466        let executor = ExecTask::new(
1467            worker,
1468            exec_config,
1469            sim_clock.clone(),
1470            db_exec.clone(),
1471            ffqns,
1472        );
1473        let mut first_execution_progress = executor
1474            .tick(sim_clock.now(), RunId::generate())
1475            .await
1476            .unwrap();
1477        assert_eq!(1, first_execution_progress.executions.len());
1478        // Started hanging, wait for lock expiry.
1479        sim_clock.move_time_forward(lock_expiry);
1480        // cleanup should be called
1481        let now_after_first_lock_expiry = sim_clock.now();
1482        {
1483            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1484            let cleanup_progress = executor
1485                .tick(now_after_first_lock_expiry, RunId::generate())
1486                .await
1487                .unwrap();
1488            assert!(cleanup_progress.executions.is_empty());
1489        }
1490        {
1491            let expired_locks = expired_timers_watcher::tick(
1492                db_pool.connection().as_ref(),
1493                now_after_first_lock_expiry,
1494            )
1495            .await
1496            .unwrap()
1497            .expired_locks;
1498            assert_eq!(1, expired_locks);
1499        }
1500        assert!(
1501            !first_execution_progress
1502                .executions
1503                .pop()
1504                .unwrap()
1505                .1
1506                .is_finished()
1507        );
1508
1509        let execution_log = db_connection.get(&execution_id).await.unwrap();
1510        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1511        assert_matches!(
1512            &execution_log.events.get(2).unwrap(),
1513            ExecutionEvent {
1514                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1515                created_at: at,
1516                backtrace_id: None,
1517            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1518        );
1519        assert_eq!(
1520            PendingState::PendingAt {
1521                scheduled_at: expected_first_timeout_expiry
1522            },
1523            execution_log.pending_state
1524        );
1525        sim_clock.move_time_forward(timeout_duration);
1526        let now_after_first_timeout = sim_clock.now();
1527        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1528
1529        let mut second_execution_progress = executor
1530            .tick(now_after_first_timeout, RunId::generate())
1531            .await
1532            .unwrap();
1533        assert_eq!(1, second_execution_progress.executions.len());
1534
1535        // Started hanging, wait for lock expiry.
1536        sim_clock.move_time_forward(lock_expiry);
1537        // cleanup should be called
1538        let now_after_second_lock_expiry = sim_clock.now();
1539        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1540        {
1541            let cleanup_progress = executor
1542                .tick(now_after_second_lock_expiry, RunId::generate())
1543                .await
1544                .unwrap();
1545            assert!(cleanup_progress.executions.is_empty());
1546        }
1547        {
1548            let expired_locks = expired_timers_watcher::tick(
1549                db_pool.connection().as_ref(),
1550                now_after_second_lock_expiry,
1551            )
1552            .await
1553            .unwrap()
1554            .expired_locks;
1555            assert_eq!(1, expired_locks);
1556        }
1557        assert!(
1558            !second_execution_progress
1559                .executions
1560                .pop()
1561                .unwrap()
1562                .1
1563                .is_finished()
1564        );
1565
1566        drop(db_connection);
1567        drop(executor);
1568        db_close.close().await;
1569    }
1570}