obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
8    LockedExecution,
9};
10use concepts::time::{ClockFn, Sleep};
11use concepts::{ComponentId, FunctionMetadata, StrVariant, SupportedFunctionReturnValue};
12use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
13use concepts::{
14    FinishedExecutionError,
15    storage::{ExecutionEventInner, JoinSetResponse, Version},
16};
17use concepts::{JoinSetId, PermanentFailureKind};
18use std::fmt::Display;
19use std::{
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31    pub lock_expiry: Duration,
32    pub tick_sleep: Duration,
33    pub batch_size: u32,
34    pub component_id: ComponentId,
35    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36    pub executor_id: ExecutorId,
37}
38
39pub struct ExecTask<C: ClockFn> {
40    worker: Arc<dyn Worker>,
41    config: ExecConfig,
42    clock_fn: C, // Used for obtaining current time when the execution finishes.
43    db_exec: Arc<dyn DbExecutor>,
44    ffqns: Arc<[FunctionFqn]>,
45}
46
47#[derive(derive_more::Debug, Default)]
48pub struct ExecutionProgress {
49    #[debug(skip)]
50    #[allow(dead_code)]
51    executions: Vec<(ExecutionId, JoinHandle<()>)>,
52}
53
54impl ExecutionProgress {
55    #[cfg(feature = "test")]
56    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
57        let mut vec = Vec::new();
58        for (exe, join_handle) in self.executions {
59            vec.push(exe);
60            join_handle.await.unwrap();
61        }
62        vec
63    }
64}
65
66#[derive(derive_more::Debug)]
67pub struct ExecutorTaskHandle {
68    #[debug(skip)]
69    is_closing: Arc<AtomicBool>,
70    #[debug(skip)]
71    abort_handle: AbortHandle,
72    component_id: ComponentId,
73    executor_id: ExecutorId,
74}
75
76impl ExecutorTaskHandle {
77    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
78    pub async fn close(&self) {
79        trace!("Gracefully closing");
80        self.is_closing.store(true, Ordering::Relaxed);
81        while !self.abort_handle.is_finished() {
82            tokio::time::sleep(Duration::from_millis(1)).await;
83        }
84        debug!("Gracefully closed");
85    }
86}
87
88impl Drop for ExecutorTaskHandle {
89    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
90    fn drop(&mut self) {
91        if self.abort_handle.is_finished() {
92            return;
93        }
94        warn!("Aborting the executor task");
95        self.abort_handle.abort();
96    }
97}
98
99#[cfg(feature = "test")]
100pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
101    extract_exported_ffqns_noext(worker)
102}
103
104fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
105    worker
106        .exported_functions()
107        .iter()
108        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
109        .collect::<Arc<_>>()
110}
111
112impl<C: ClockFn + 'static> ExecTask<C> {
113    #[cfg(feature = "test")]
114    pub fn new(
115        worker: Arc<dyn Worker>,
116        config: ExecConfig,
117        clock_fn: C,
118        db_exec: Arc<dyn DbExecutor>,
119        ffqns: Arc<[FunctionFqn]>,
120    ) -> Self {
121        Self {
122            worker,
123            config,
124            clock_fn,
125            db_exec,
126            ffqns,
127        }
128    }
129
130    #[cfg(feature = "test")]
131    pub fn new_all_ffqns_test(
132        worker: Arc<dyn Worker>,
133        config: ExecConfig,
134        clock_fn: C,
135        db_exec: Arc<dyn DbExecutor>,
136    ) -> Self {
137        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
138        Self {
139            worker,
140            config,
141            clock_fn,
142            db_exec,
143            ffqns,
144        }
145    }
146
147    pub fn spawn_new(
148        worker: Arc<dyn Worker>,
149        config: ExecConfig,
150        clock_fn: C,
151        db_exec: Arc<dyn DbExecutor>,
152        sleep: impl Sleep + 'static,
153    ) -> ExecutorTaskHandle {
154        let is_closing = Arc::new(AtomicBool::default());
155        let is_closing_inner = is_closing.clone();
156        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
157        let component_id = config.component_id.clone();
158        let executor_id = config.executor_id;
159        let abort_handle = tokio::spawn(async move {
160            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
161            let task = ExecTask {
162                worker,
163                config,
164                db_exec,
165                ffqns: ffqns.clone(),
166                clock_fn: clock_fn.clone(),
167            };
168            while !is_closing_inner.load(Ordering::Relaxed) {
169                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
170
171                task.db_exec
172                    .wait_for_pending(clock_fn.now(), ffqns.clone(), {
173                        let sleep = sleep.clone();
174                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
175                    .await;
176            }
177        })
178        .abort_handle();
179        ExecutorTaskHandle {
180            is_closing,
181            abort_handle,
182            component_id,
183            executor_id,
184        }
185    }
186
187    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
188        if let Some(task_limiter) = &self.config.task_limiter {
189            let mut locks = Vec::new();
190            for _ in 0..self.config.batch_size {
191                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
192                    locks.push(Some(permit));
193                } else {
194                    break;
195                }
196            }
197            locks
198        } else {
199            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
200            for _ in 0..self.config.batch_size {
201                vec.push(None);
202            }
203            vec
204        }
205    }
206
207    #[cfg(feature = "test")]
208    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
209        self.tick(executed_at, run_id).await.unwrap()
210    }
211
212    #[cfg(feature = "test")]
213    pub async fn tick_test_await(
214        &self,
215        executed_at: DateTime<Utc>,
216        run_id: RunId,
217    ) -> Vec<ExecutionId> {
218        self.tick(executed_at, run_id)
219            .await
220            .unwrap()
221            .wait_for_tasks()
222            .await
223    }
224
225    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
226    async fn tick(
227        &self,
228        executed_at: DateTime<Utc>,
229        run_id: RunId,
230    ) -> Result<ExecutionProgress, DbErrorGeneric> {
231        let locked_executions = {
232            let mut permits = self.acquire_task_permits();
233            if permits.is_empty() {
234                return Ok(ExecutionProgress::default());
235            }
236            let lock_expires_at = executed_at + self.config.lock_expiry;
237            let locked_executions = self
238                .db_exec
239                .lock_pending(
240                    permits.len(), // batch size
241                    executed_at,   // fetch expiring before now
242                    self.ffqns.clone(),
243                    executed_at, // created at
244                    self.config.component_id.clone(),
245                    self.config.executor_id,
246                    lock_expires_at,
247                    run_id,
248                )
249                .await?;
250            // Drop permits if too many were allocated.
251            while permits.len() > locked_executions.len() {
252                permits.pop();
253            }
254            assert_eq!(permits.len(), locked_executions.len());
255            locked_executions.into_iter().zip(permits)
256        };
257        let execution_deadline = executed_at + self.config.lock_expiry;
258
259        let mut executions = Vec::with_capacity(locked_executions.len());
260        for (locked_execution, permit) in locked_executions {
261            let execution_id = locked_execution.execution_id.clone();
262            let join_handle = {
263                let worker = self.worker.clone();
264                let db = self.db_exec.clone();
265                let clock_fn = self.clock_fn.clone();
266                let run_id = locked_execution.run_id;
267                let worker_span = info_span!(parent: None, "worker",
268                    "otel.name" = format!("worker {}", locked_execution.ffqn),
269                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
270                locked_execution.metadata.enrich(&worker_span);
271                tokio::spawn({
272                    let worker_span2 = worker_span.clone();
273                    async move {
274                        let _permit = permit;
275                        let res = Self::run_worker(
276                            worker,
277                            db.as_ref(),
278                            execution_deadline,
279                            clock_fn,
280                            locked_execution,
281                            worker_span2,
282                        )
283                        .await;
284                        if let Err(db_error) = res {
285                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
286                        }
287                    }
288                    .instrument(worker_span)
289                })
290            };
291            executions.push((execution_id, join_handle));
292        }
293        Ok(ExecutionProgress { executions })
294    }
295
296    async fn run_worker(
297        worker: Arc<dyn Worker>,
298        db_exec: &dyn DbExecutor,
299        execution_deadline: DateTime<Utc>,
300        clock_fn: C,
301        locked_execution: LockedExecution,
302        worker_span: Span,
303    ) -> Result<(), DbErrorWrite> {
304        debug!("Worker::run starting");
305        trace!(
306            version = %locked_execution.next_version,
307            params = ?locked_execution.params,
308            event_history = ?locked_execution.event_history,
309            "Worker::run starting"
310        );
311        let can_be_retried = ExecutionLog::can_be_retried_after(
312            locked_execution.intermittent_event_count + 1,
313            locked_execution.max_retries,
314            locked_execution.retry_exp_backoff,
315        );
316        let unlock_expiry_on_limit_reached =
317            ExecutionLog::compute_retry_duration_when_retrying_forever(
318                locked_execution.intermittent_event_count + 1,
319                locked_execution.retry_exp_backoff,
320            );
321        let ctx = WorkerContext {
322            execution_id: locked_execution.execution_id.clone(),
323            metadata: locked_execution.metadata,
324            ffqn: locked_execution.ffqn,
325            params: locked_execution.params,
326            event_history: locked_execution.event_history,
327            responses: locked_execution
328                .responses
329                .into_iter()
330                .map(|outer| outer.event)
331                .collect(),
332            version: locked_execution.next_version,
333            execution_deadline,
334            can_be_retried: can_be_retried.is_some(),
335            run_id: locked_execution.run_id,
336            worker_span,
337        };
338        let worker_result = worker.run(ctx).await;
339        trace!(?worker_result, "Worker::run finished");
340        let result_obtained_at = clock_fn.now();
341        match Self::worker_result_to_execution_event(
342            locked_execution.execution_id,
343            worker_result,
344            result_obtained_at,
345            locked_execution.parent,
346            can_be_retried,
347            unlock_expiry_on_limit_reached,
348        )? {
349            Some(append) => {
350                trace!("Appending {append:?}");
351                append.append(db_exec).await
352            }
353            None => Ok(()),
354        }
355    }
356
357    /// Map the `WorkerError` to an temporary or a permanent failure.
358    fn worker_result_to_execution_event(
359        execution_id: ExecutionId,
360        worker_result: WorkerResult,
361        result_obtained_at: DateTime<Utc>,
362        parent: Option<(ExecutionId, JoinSetId)>,
363        can_be_retried: Option<Duration>,
364        unlock_expiry_on_limit_reached: Duration,
365    ) -> Result<Option<Append>, DbErrorWrite> {
366        Ok(match worker_result {
367            WorkerResult::Ok(result, new_version, http_client_traces) => {
368                info!(
369                    "Execution finished: {}",
370                    result.as_pending_state_finished_result()
371                );
372                let child_finished =
373                    parent.map(
374                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
375                            parent_execution_id,
376                            parent_join_set,
377                            result: result.clone(),
378                        },
379                    );
380                let primary_event = AppendRequest {
381                    created_at: result_obtained_at,
382                    event: ExecutionEventInner::Finished {
383                        result,
384                        http_client_traces,
385                    },
386                };
387
388                Some(Append {
389                    created_at: result_obtained_at,
390                    primary_event,
391                    execution_id,
392                    version: new_version,
393                    child_finished,
394                })
395            }
396            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
397            WorkerResult::Err(err) => {
398                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
399                let (primary_event, child_finished, version) = match err {
400                    WorkerError::TemporaryTimeout {
401                        http_client_traces,
402                        version,
403                    } => {
404                        if let Some(duration) = can_be_retried {
405                            let backoff_expires_at = result_obtained_at + duration;
406                            info!(
407                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
408                            );
409                            (
410                                ExecutionEventInner::TemporarilyTimedOut {
411                                    backoff_expires_at,
412                                    http_client_traces,
413                                },
414                                None,
415                                version,
416                            )
417                        } else {
418                            info!("Permanent timeout");
419                            let result = SupportedFunctionReturnValue::ExecutionError(
420                                FinishedExecutionError::PermanentTimeout,
421                            );
422                            let child_finished =
423                                parent.map(|(parent_execution_id, parent_join_set)| {
424                                    ChildFinishedResponse {
425                                        parent_execution_id,
426                                        parent_join_set,
427                                        result: result.clone(),
428                                    }
429                                });
430                            (
431                                ExecutionEventInner::Finished {
432                                    result,
433                                    http_client_traces,
434                                },
435                                child_finished,
436                                version,
437                            )
438                        }
439                    }
440                    WorkerError::DbError(db_error) => {
441                        return Err(db_error);
442                    }
443                    WorkerError::ActivityTrap {
444                        reason: reason_inner,
445                        trap_kind,
446                        detail,
447                        version,
448                        http_client_traces,
449                    } => retry_or_fail(
450                        result_obtained_at,
451                        parent,
452                        can_be_retried,
453                        reason_full,
454                        reason_inner,
455                        trap_kind, // short reason like `trap` for logs
456                        detail,
457                        version,
458                        http_client_traces,
459                    ),
460                    WorkerError::ActivityPreopenedDirError {
461                        reason_kind,
462                        reason_inner,
463                        version,
464                    } => retry_or_fail(
465                        result_obtained_at,
466                        parent,
467                        can_be_retried,
468                        reason_full,
469                        reason_inner,
470                        reason_kind, // short reason for logs
471                        None,        // detail
472                        version,
473                        None, // http_client_traces
474                    ),
475                    WorkerError::ActivityReturnedError {
476                        detail,
477                        version,
478                        http_client_traces,
479                    } => {
480                        let duration = can_be_retried.expect(
481                            "ActivityReturnedError must not be returned when retries are exhausted",
482                        );
483                        let expires_at = result_obtained_at + duration;
484                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
485                        (
486                            ExecutionEventInner::TemporarilyFailed {
487                                backoff_expires_at: expires_at,
488                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
489                                reason_inner: StrVariant::Static("activity returned error"),
490                                detail, // contains the backtrace
491                                http_client_traces,
492                            },
493                            None,
494                            version,
495                        )
496                    }
497                    WorkerError::LimitReached {
498                        reason: inner_reason,
499                        version: new_version,
500                    } => {
501                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
502                        warn!(
503                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
504                        );
505                        (
506                            ExecutionEventInner::Unlocked {
507                                backoff_expires_at: expires_at,
508                                reason: StrVariant::from(reason_full),
509                            },
510                            None,
511                            new_version,
512                        )
513                    }
514                    WorkerError::FatalError(fatal_error, version) => {
515                        info!("Fatal worker error - {fatal_error:?}");
516                        let result = SupportedFunctionReturnValue::ExecutionError(
517                            FinishedExecutionError::from(fatal_error),
518                        );
519                        let child_finished =
520                            parent.map(|(parent_execution_id, parent_join_set)| {
521                                ChildFinishedResponse {
522                                    parent_execution_id,
523                                    parent_join_set,
524                                    result: result.clone(),
525                                }
526                            });
527                        (
528                            ExecutionEventInner::Finished {
529                                result,
530                                http_client_traces: None,
531                            },
532                            child_finished,
533                            version,
534                        )
535                    }
536                };
537                Some(Append {
538                    created_at: result_obtained_at,
539                    primary_event: AppendRequest {
540                        created_at: result_obtained_at,
541                        event: primary_event,
542                    },
543                    execution_id,
544                    version,
545                    child_finished,
546                })
547            }
548        })
549    }
550}
551
552#[expect(clippy::too_many_arguments)]
553fn retry_or_fail(
554    result_obtained_at: DateTime<Utc>,
555    parent: Option<(ExecutionId, JoinSetId)>,
556    can_be_retried: Option<Duration>,
557    reason_full: String,
558    reason_inner: String,
559    err_kind_for_logs: impl Display,
560    detail: Option<String>,
561    version: Version,
562    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
563) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
564    if let Some(duration) = can_be_retried {
565        let expires_at = result_obtained_at + duration;
566        debug!(
567            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
568        );
569        (
570            ExecutionEventInner::TemporarilyFailed {
571                backoff_expires_at: expires_at,
572                reason_full: StrVariant::from(reason_full),
573                reason_inner: StrVariant::from(reason_inner),
574                detail,
575                http_client_traces,
576            },
577            None,
578            version,
579        )
580    } else {
581        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
582        let result = SupportedFunctionReturnValue::ExecutionError(
583            FinishedExecutionError::PermanentFailure {
584                reason_inner,
585                reason_full,
586                kind: PermanentFailureKind::ActivityTrap,
587                detail,
588            },
589        );
590
591        let child_finished =
592            parent.map(
593                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
594                    parent_execution_id,
595                    parent_join_set,
596                    result: result.clone(),
597                },
598            );
599        (
600            ExecutionEventInner::Finished {
601                result,
602                http_client_traces,
603            },
604            child_finished,
605            version,
606        )
607    }
608}
609
610#[derive(Debug, Clone)]
611pub(crate) struct ChildFinishedResponse {
612    pub(crate) parent_execution_id: ExecutionId,
613    pub(crate) parent_join_set: JoinSetId,
614    pub(crate) result: SupportedFunctionReturnValue,
615}
616
617#[derive(Debug, Clone)]
618pub(crate) struct Append {
619    pub(crate) created_at: DateTime<Utc>,
620    pub(crate) primary_event: AppendRequest,
621    pub(crate) execution_id: ExecutionId,
622    pub(crate) version: Version,
623    pub(crate) child_finished: Option<ChildFinishedResponse>,
624}
625
626impl Append {
627    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
628        if let Some(child_finished) = self.child_finished {
629            assert_matches!(
630                &self.primary_event,
631                AppendRequest {
632                    event: ExecutionEventInner::Finished { .. },
633                    ..
634                }
635            );
636            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
637            let events = AppendEventsToExecution {
638                execution_id: self.execution_id,
639                version: self.version.clone(),
640                batch: vec![self.primary_event],
641            };
642            let response = AppendResponseToExecution {
643                parent_execution_id: child_finished.parent_execution_id,
644                parent_response_event: JoinSetResponseEventOuter {
645                    created_at: self.created_at,
646                    event: JoinSetResponseEvent {
647                        join_set_id: child_finished.parent_join_set,
648                        event: JoinSetResponse::ChildExecutionFinished {
649                            child_execution_id: derived,
650                            // Since self.primary_event is a finished event, the version will remain the same.
651                            finished_version: self.version,
652                            result: child_finished.result,
653                        },
654                    },
655                },
656            };
657
658            db_exec
659                .append_batch_respond_to_parent(events, response, self.created_at)
660                .await?;
661        } else {
662            db_exec
663                .append(self.execution_id, self.version, self.primary_event)
664                .await?;
665        }
666        Ok(())
667    }
668}
669
670#[cfg(any(test, feature = "test"))]
671pub mod simple_worker {
672    use crate::worker::{Worker, WorkerContext, WorkerResult};
673    use async_trait::async_trait;
674    use concepts::{
675        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
676        storage::{HistoryEvent, Version},
677    };
678    use indexmap::IndexMap;
679    use std::sync::Arc;
680    use tracing::trace;
681
682    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
683    pub type SimpleWorkerResultMap =
684        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
685
686    #[derive(Clone, Debug)]
687    pub struct SimpleWorker {
688        pub worker_results_rev: SimpleWorkerResultMap,
689        pub ffqn: FunctionFqn,
690        exported: [FunctionMetadata; 1],
691    }
692
693    impl SimpleWorker {
694        #[must_use]
695        pub fn with_single_result(res: WorkerResult) -> Self {
696            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
697                Version::new(2),
698                (vec![], res),
699            )]))))
700        }
701
702        #[must_use]
703        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
704            Self {
705                worker_results_rev: self.worker_results_rev,
706                exported: [FunctionMetadata {
707                    ffqn: ffqn.clone(),
708                    parameter_types: ParameterTypes::default(),
709                    return_type: RETURN_TYPE_DUMMY,
710                    extension: None,
711                    submittable: true,
712                }],
713                ffqn,
714            }
715        }
716
717        #[must_use]
718        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
719            Self {
720                worker_results_rev,
721                ffqn: FFQN_SOME,
722                exported: [FunctionMetadata {
723                    ffqn: FFQN_SOME,
724                    parameter_types: ParameterTypes::default(),
725                    return_type: RETURN_TYPE_DUMMY,
726                    extension: None,
727                    submittable: true,
728                }],
729            }
730        }
731    }
732
733    #[async_trait]
734    impl Worker for SimpleWorker {
735        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
736            let (expected_version, (expected_eh, worker_result)) =
737                self.worker_results_rev.lock().unwrap().pop().unwrap();
738            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
739            assert_eq!(expected_version, ctx.version);
740            assert_eq!(expected_eh, ctx.event_history);
741            worker_result
742        }
743
744        fn exported_functions(&self) -> &[FunctionMetadata] {
745            &self.exported
746        }
747    }
748}
749
750#[cfg(test)]
751mod tests {
752    use self::simple_worker::SimpleWorker;
753    use super::*;
754    use crate::{expired_timers_watcher, worker::WorkerResult};
755    use assert_matches::assert_matches;
756    use async_trait::async_trait;
757    use concepts::storage::DbPoolCloseable;
758    use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
759    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
760    use concepts::time::Now;
761    use concepts::{
762        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
763        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
764    };
765    use db_tests::Database;
766    use indexmap::IndexMap;
767    use simple_worker::FFQN_SOME;
768    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
769    use test_utils::set_up;
770    use test_utils::sim_clock::{ConstClock, SimClock};
771
772    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
773
774    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
775        config: ExecConfig,
776        clock_fn: C,
777        db_exec: Arc<dyn DbExecutor>,
778        worker: Arc<W>,
779        executed_at: DateTime<Utc>,
780    ) -> Vec<ExecutionId> {
781        trace!("Ticking with {worker:?}");
782        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
783        let executor = ExecTask::new(worker, config, clock_fn, db_exec, ffqns);
784        executor
785            .tick_test_await(executed_at, RunId::generate())
786            .await
787    }
788
789    #[tokio::test]
790    async fn execute_simple_lifecycle_tick_based_mem() {
791        let created_at = Now.now();
792        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
793        execute_simple_lifecycle_tick_based(
794            db_pool.connection().as_ref(),
795            db_exec.clone(),
796            ConstClock(created_at),
797        )
798        .await;
799        db_close.close().await;
800    }
801
802    #[tokio::test]
803    async fn execute_simple_lifecycle_tick_based_sqlite() {
804        let created_at = Now.now();
805        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
806        execute_simple_lifecycle_tick_based(
807            db_pool.connection().as_ref(),
808            db_exec.clone(),
809            ConstClock(created_at),
810        )
811        .await;
812        db_close.close().await;
813    }
814
815    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
816        db_connection: &dyn DbConnection,
817        db_exec: Arc<dyn DbExecutor>,
818        clock_fn: C,
819    ) {
820        set_up();
821        let created_at = clock_fn.now();
822        let exec_config = ExecConfig {
823            batch_size: 1,
824            lock_expiry: Duration::from_secs(1),
825            tick_sleep: Duration::from_millis(100),
826            component_id: ComponentId::dummy_activity(),
827            task_limiter: None,
828            executor_id: ExecutorId::generate(),
829        };
830
831        let execution_log = create_and_tick(
832            CreateAndTickConfig {
833                execution_id: ExecutionId::generate(),
834                created_at,
835                max_retries: 0,
836                executed_at: created_at,
837                retry_exp_backoff: Duration::ZERO,
838            },
839            clock_fn,
840            db_connection,
841            db_exec,
842            exec_config,
843            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
844                SUPPORTED_RETURN_VALUE_OK_EMPTY,
845                Version::new(2),
846                None,
847            ))),
848            tick_fn,
849        )
850        .await;
851        assert_matches!(
852            execution_log.events.get(2).unwrap(),
853            ExecutionEvent {
854                event: ExecutionEventInner::Finished {
855                    result: SupportedFunctionReturnValue::Ok { ok: None },
856                    http_client_traces: None
857                },
858                created_at: _,
859                backtrace_id: None,
860            }
861        );
862    }
863
864    #[tokio::test]
865    async fn execute_simple_lifecycle_task_based_mem() {
866        set_up();
867        let created_at = Now.now();
868        let clock_fn = ConstClock(created_at);
869        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
870        let exec_config = ExecConfig {
871            batch_size: 1,
872            lock_expiry: Duration::from_secs(1),
873            tick_sleep: Duration::ZERO,
874            component_id: ComponentId::dummy_activity(),
875            task_limiter: None,
876            executor_id: ExecutorId::generate(),
877        };
878
879        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
880            SUPPORTED_RETURN_VALUE_OK_EMPTY,
881            Version::new(2),
882            None,
883        )));
884
885        let execution_log = create_and_tick(
886            CreateAndTickConfig {
887                execution_id: ExecutionId::generate(),
888                created_at,
889                max_retries: 0,
890                executed_at: created_at,
891                retry_exp_backoff: Duration::ZERO,
892            },
893            clock_fn,
894            db_pool.connection().as_ref(),
895            db_exec,
896            exec_config,
897            worker,
898            tick_fn,
899        )
900        .await;
901        assert_matches!(
902            execution_log.events.get(2).unwrap(),
903            ExecutionEvent {
904                event: ExecutionEventInner::Finished {
905                    result: SupportedFunctionReturnValue::Ok { ok: None },
906                    http_client_traces: None
907                },
908                created_at: _,
909                backtrace_id: None,
910            }
911        );
912        db_close.close().await;
913    }
914
915    struct CreateAndTickConfig {
916        execution_id: ExecutionId,
917        created_at: DateTime<Utc>,
918        max_retries: u32,
919        executed_at: DateTime<Utc>,
920        retry_exp_backoff: Duration,
921    }
922
923    async fn create_and_tick<
924        W: Worker,
925        C: ClockFn,
926        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
927        F: Future<Output = Vec<ExecutionId>>,
928    >(
929        config: CreateAndTickConfig,
930        clock_fn: C,
931        db_connection: &dyn DbConnection,
932        db_exec: Arc<dyn DbExecutor>,
933        exec_config: ExecConfig,
934        worker: Arc<W>,
935        mut tick: T,
936    ) -> ExecutionLog {
937        // Create an execution
938        db_connection
939            .create(CreateRequest {
940                created_at: config.created_at,
941                execution_id: config.execution_id.clone(),
942                ffqn: FFQN_SOME,
943                params: Params::empty(),
944                parent: None,
945                metadata: concepts::ExecutionMetadata::empty(),
946                scheduled_at: config.created_at,
947                retry_exp_backoff: config.retry_exp_backoff,
948                max_retries: config.max_retries,
949                component_id: ComponentId::dummy_activity(),
950                scheduled_by: None,
951            })
952            .await
953            .unwrap();
954        // execute!
955        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
956        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
957        debug!("Execution history after tick: {execution_log:?}");
958        // check that DB contains Created and Locked events.
959        assert_matches!(
960            execution_log.events.first().unwrap(),
961            ExecutionEvent {
962                event: ExecutionEventInner::Created { .. },
963                created_at: actually_created_at,
964                backtrace_id: None,
965            }
966            if config.created_at == *actually_created_at
967        );
968        let locked_at = assert_matches!(
969            execution_log.events.get(1).unwrap(),
970            ExecutionEvent {
971                event: ExecutionEventInner::Locked { .. },
972                created_at: locked_at,
973                backtrace_id: None,
974            } if config.created_at <= *locked_at
975            => *locked_at
976        );
977        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
978            event: _,
979            created_at: executed_at,
980            backtrace_id: None,
981        } if *executed_at >= locked_at);
982        execution_log
983    }
984
985    #[tokio::test]
986    async fn activity_trap_should_trigger_an_execution_retry() {
987        set_up();
988        let sim_clock = SimClock::default();
989        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
990        let exec_config = ExecConfig {
991            batch_size: 1,
992            lock_expiry: Duration::from_secs(1),
993            tick_sleep: Duration::ZERO,
994            component_id: ComponentId::dummy_activity(),
995            task_limiter: None,
996            executor_id: ExecutorId::generate(),
997        };
998        let expected_reason = "error reason";
999        let expected_detail = "error detail";
1000        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1001            WorkerError::ActivityTrap {
1002                reason: expected_reason.to_string(),
1003                trap_kind: concepts::TrapKind::Trap,
1004                detail: Some(expected_detail.to_string()),
1005                version: Version::new(2),
1006                http_client_traces: None,
1007            },
1008        )));
1009        let retry_exp_backoff = Duration::from_millis(100);
1010        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1011        let execution_log = create_and_tick(
1012            CreateAndTickConfig {
1013                execution_id: ExecutionId::generate(),
1014                created_at: sim_clock.now(),
1015                max_retries: 1,
1016                executed_at: sim_clock.now(),
1017                retry_exp_backoff,
1018            },
1019            sim_clock.clone(),
1020            db_pool.connection().as_ref(),
1021            db_exec.clone(),
1022            exec_config.clone(),
1023            worker,
1024            tick_fn,
1025        )
1026        .await;
1027        assert_eq!(3, execution_log.events.len());
1028        {
1029            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1030                &execution_log.events.get(2).unwrap(),
1031                ExecutionEvent {
1032                    event: ExecutionEventInner::TemporarilyFailed {
1033                        reason_inner,
1034                        reason_full,
1035                        detail,
1036                        backoff_expires_at,
1037                        http_client_traces: None,
1038                    },
1039                    created_at: at,
1040                    backtrace_id: None,
1041                }
1042                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1043            );
1044            assert_eq!(expected_reason, reason_inner.deref());
1045            assert_eq!(
1046                format!("activity trap: {expected_reason}"),
1047                reason_full.deref()
1048            );
1049            assert_eq!(Some(expected_detail), detail.as_deref());
1050            assert_eq!(at, sim_clock.now());
1051            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1052        }
1053        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1054            std::sync::Mutex::new(IndexMap::from([(
1055                Version::new(4),
1056                (
1057                    vec![],
1058                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1059                ),
1060            )])),
1061        )));
1062        // noop until `retry_exp_backoff` expires
1063        assert!(
1064            tick_fn(
1065                exec_config.clone(),
1066                sim_clock.clone(),
1067                db_exec.clone(),
1068                worker.clone(),
1069                sim_clock.now(),
1070            )
1071            .await
1072            .is_empty()
1073        );
1074        // tick again to finish the execution
1075        sim_clock.move_time_forward(retry_exp_backoff);
1076        tick_fn(
1077            exec_config,
1078            sim_clock.clone(),
1079            db_exec.clone(),
1080            worker,
1081            sim_clock.now(),
1082        )
1083        .await;
1084        let execution_log = {
1085            let db_connection = db_pool.connection();
1086            db_connection
1087                .get(&execution_log.execution_id)
1088                .await
1089                .unwrap()
1090        };
1091        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1092        assert_matches!(
1093            execution_log.events.get(3).unwrap(),
1094            ExecutionEvent {
1095                event: ExecutionEventInner::Locked { .. },
1096                created_at: at,
1097                backtrace_id: None,
1098            } if *at == sim_clock.now()
1099        );
1100        assert_matches!(
1101            execution_log.events.get(4).unwrap(),
1102            ExecutionEvent {
1103                event: ExecutionEventInner::Finished {
1104                    result: SupportedFunctionReturnValue::Ok{ok:None},
1105                    http_client_traces: None
1106                },
1107                created_at: finished_at,
1108                backtrace_id: None,
1109            } if *finished_at == sim_clock.now()
1110        );
1111        db_close.close().await;
1112    }
1113
1114    #[tokio::test]
1115    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1116        set_up();
1117        let created_at = Now.now();
1118        let clock_fn = ConstClock(created_at);
1119        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1120        let exec_config = ExecConfig {
1121            batch_size: 1,
1122            lock_expiry: Duration::from_secs(1),
1123            tick_sleep: Duration::ZERO,
1124            component_id: ComponentId::dummy_activity(),
1125            task_limiter: None,
1126            executor_id: ExecutorId::generate(),
1127        };
1128
1129        let expected_reason = "error reason";
1130        let expected_detail = "error detail";
1131        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1132            WorkerError::ActivityTrap {
1133                reason: expected_reason.to_string(),
1134                trap_kind: concepts::TrapKind::Trap,
1135                detail: Some(expected_detail.to_string()),
1136                version: Version::new(2),
1137                http_client_traces: None,
1138            },
1139        )));
1140        let execution_log = create_and_tick(
1141            CreateAndTickConfig {
1142                execution_id: ExecutionId::generate(),
1143                created_at,
1144                max_retries: 0,
1145                executed_at: created_at,
1146                retry_exp_backoff: Duration::ZERO,
1147            },
1148            clock_fn,
1149            db_pool.connection().as_ref(),
1150            db_exec,
1151            exec_config.clone(),
1152            worker,
1153            tick_fn,
1154        )
1155        .await;
1156        assert_eq!(3, execution_log.events.len());
1157        let (reason, kind, detail) = assert_matches!(
1158            &execution_log.events.get(2).unwrap(),
1159            ExecutionEvent {
1160                event: ExecutionEventInner::Finished{
1161                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1162                    http_client_traces: None
1163                },
1164                created_at: at,
1165                backtrace_id: None,
1166            } if *at == created_at
1167            => (reason_inner, kind, detail)
1168        );
1169        assert_eq!(expected_reason, *reason);
1170        assert_eq!(Some(expected_detail), detail.as_deref());
1171        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1172
1173        db_close.close().await;
1174    }
1175
1176    #[tokio::test]
1177    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1178        let worker_error = WorkerError::ActivityTrap {
1179            reason: "error reason".to_string(),
1180            trap_kind: TrapKind::Trap,
1181            detail: Some("detail".to_string()),
1182            version: Version::new(2),
1183            http_client_traces: None,
1184        };
1185        let expected_child_err = FinishedExecutionError::PermanentFailure {
1186            reason_full: "activity trap: error reason".to_string(),
1187            reason_inner: "error reason".to_string(),
1188            kind: PermanentFailureKind::ActivityTrap,
1189            detail: Some("detail".to_string()),
1190        };
1191        child_execution_permanently_failed_should_notify_parent(
1192            WorkerResult::Err(worker_error),
1193            expected_child_err,
1194        )
1195        .await;
1196    }
1197
1198    // TODO: Add test for WorkerError::TemporaryTimeout
1199
1200    #[tokio::test]
1201    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1202        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1203        child_execution_permanently_failed_should_notify_parent(
1204            WorkerResult::DbUpdatedByWorkerOrWatcher,
1205            expected_child_err,
1206        )
1207        .await;
1208    }
1209
1210    async fn child_execution_permanently_failed_should_notify_parent(
1211        worker_result: WorkerResult,
1212        expected_child_err: FinishedExecutionError,
1213    ) {
1214        use concepts::storage::JoinSetResponseEventOuter;
1215        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1216
1217        set_up();
1218        let sim_clock = SimClock::default();
1219        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1220
1221        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1222            WorkerResult::DbUpdatedByWorkerOrWatcher,
1223        ));
1224        let parent_execution_id = ExecutionId::generate();
1225        db_pool
1226            .connection()
1227            .create(CreateRequest {
1228                created_at: sim_clock.now(),
1229                execution_id: parent_execution_id.clone(),
1230                ffqn: FFQN_SOME,
1231                params: Params::empty(),
1232                parent: None,
1233                metadata: concepts::ExecutionMetadata::empty(),
1234                scheduled_at: sim_clock.now(),
1235                retry_exp_backoff: Duration::ZERO,
1236                max_retries: 0,
1237                component_id: ComponentId::dummy_activity(),
1238                scheduled_by: None,
1239            })
1240            .await
1241            .unwrap();
1242        tick_fn(
1243            ExecConfig {
1244                batch_size: 1,
1245                lock_expiry: LOCK_EXPIRY,
1246                tick_sleep: Duration::ZERO,
1247                component_id: ComponentId::dummy_activity(),
1248                task_limiter: None,
1249                executor_id: ExecutorId::generate(),
1250            },
1251            sim_clock.clone(),
1252            db_exec.clone(),
1253            parent_worker,
1254            sim_clock.now(),
1255        )
1256        .await;
1257
1258        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1259        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1260        // executor does not append anything, this should have been written by the worker:
1261        {
1262            let child = CreateRequest {
1263                created_at: sim_clock.now(),
1264                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1265                ffqn: FFQN_CHILD,
1266                params: Params::empty(),
1267                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1268                metadata: concepts::ExecutionMetadata::empty(),
1269                scheduled_at: sim_clock.now(),
1270                retry_exp_backoff: Duration::ZERO,
1271                max_retries: 0,
1272                component_id: ComponentId::dummy_activity(),
1273                scheduled_by: None,
1274            };
1275            let current_time = sim_clock.now();
1276            let join_set = AppendRequest {
1277                created_at: current_time,
1278                event: ExecutionEventInner::HistoryEvent {
1279                    event: HistoryEvent::JoinSetCreate {
1280                        join_set_id: join_set_id.clone(),
1281                        closing_strategy: ClosingStrategy::Complete,
1282                    },
1283                },
1284            };
1285            let child_exec_req = AppendRequest {
1286                created_at: current_time,
1287                event: ExecutionEventInner::HistoryEvent {
1288                    event: HistoryEvent::JoinSetRequest {
1289                        join_set_id: join_set_id.clone(),
1290                        request: JoinSetRequest::ChildExecutionRequest {
1291                            child_execution_id: child_execution_id.clone(),
1292                        },
1293                    },
1294                },
1295            };
1296            let join_next = AppendRequest {
1297                created_at: current_time,
1298                event: ExecutionEventInner::HistoryEvent {
1299                    event: HistoryEvent::JoinNext {
1300                        join_set_id: join_set_id.clone(),
1301                        run_expires_at: sim_clock.now(),
1302                        closing: false,
1303                        requested_ffqn: Some(FFQN_CHILD),
1304                    },
1305                },
1306            };
1307            db_pool
1308                .connection()
1309                .append_batch_create_new_execution(
1310                    current_time,
1311                    vec![join_set, child_exec_req, join_next],
1312                    parent_execution_id.clone(),
1313                    Version::new(2),
1314                    vec![child],
1315                )
1316                .await
1317                .unwrap();
1318        }
1319
1320        let child_worker =
1321            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1322
1323        // execute the child
1324        tick_fn(
1325            ExecConfig {
1326                batch_size: 1,
1327                lock_expiry: LOCK_EXPIRY,
1328                tick_sleep: Duration::ZERO,
1329                component_id: ComponentId::dummy_activity(),
1330                task_limiter: None,
1331                executor_id: ExecutorId::generate(),
1332            },
1333            sim_clock.clone(),
1334            db_exec.clone(),
1335            child_worker,
1336            sim_clock.now(),
1337        )
1338        .await;
1339        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1340            // In case of timeout, let the timers watcher handle it
1341            sim_clock.move_time_forward(LOCK_EXPIRY);
1342            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1343                .await
1344                .unwrap();
1345        }
1346        let child_log = db_pool
1347            .connection()
1348            .get(&ExecutionId::Derived(child_execution_id.clone()))
1349            .await
1350            .unwrap();
1351        assert!(child_log.pending_state.is_finished());
1352        assert_eq!(
1353            Version(2),
1354            child_log.next_version,
1355            "created = 0, locked = 1, with_single_result = 2"
1356        );
1357        assert_eq!(
1358            ExecutionEventInner::Finished {
1359                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1360                http_client_traces: None
1361            },
1362            child_log.last_event().event
1363        );
1364        let parent_log = db_pool
1365            .connection()
1366            .get(&parent_execution_id)
1367            .await
1368            .unwrap();
1369        assert_matches!(
1370            parent_log.pending_state,
1371            PendingState::PendingAt {
1372                scheduled_at
1373            } if scheduled_at == sim_clock.now(),
1374            "parent should be back to pending"
1375        );
1376        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1377            parent_log.responses.last(),
1378            Some(JoinSetResponseEventOuter{
1379                created_at: at,
1380                event: JoinSetResponseEvent{
1381                    join_set_id: found_join_set_id,
1382                    event: JoinSetResponse::ChildExecutionFinished {
1383                        child_execution_id: found_child_execution_id,
1384                        finished_version,
1385                        result: found_result,
1386                    }
1387                }
1388            })
1389             if *at == sim_clock.now()
1390            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1391        );
1392        assert_eq!(join_set_id, *found_join_set_id);
1393        assert_eq!(child_execution_id, *found_child_execution_id);
1394        assert_eq!(child_log.next_version, *child_finished_version);
1395        assert_matches!(
1396            found_result,
1397            SupportedFunctionReturnValue::ExecutionError(_)
1398        );
1399
1400        db_close.close().await;
1401    }
1402
1403    #[derive(Clone, Debug)]
1404    struct SleepyWorker {
1405        duration: Duration,
1406        result: SupportedFunctionReturnValue,
1407        exported: [FunctionMetadata; 1],
1408    }
1409
1410    #[async_trait]
1411    impl Worker for SleepyWorker {
1412        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1413            tokio::time::sleep(self.duration).await;
1414            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1415        }
1416
1417        fn exported_functions(&self) -> &[FunctionMetadata] {
1418            &self.exported
1419        }
1420    }
1421
1422    #[tokio::test]
1423    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1424        set_up();
1425        let sim_clock = SimClock::default();
1426        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1427        let lock_expiry = Duration::from_millis(100);
1428        let exec_config = ExecConfig {
1429            batch_size: 1,
1430            lock_expiry,
1431            tick_sleep: Duration::ZERO,
1432            component_id: ComponentId::dummy_activity(),
1433            task_limiter: None,
1434            executor_id: ExecutorId::generate(),
1435        };
1436
1437        let worker = Arc::new(SleepyWorker {
1438            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1439            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1440            exported: [FunctionMetadata {
1441                ffqn: FFQN_SOME,
1442                parameter_types: ParameterTypes::default(),
1443                return_type: RETURN_TYPE_DUMMY,
1444                extension: None,
1445                submittable: true,
1446            }],
1447        });
1448        // Create an execution
1449        let execution_id = ExecutionId::generate();
1450        let timeout_duration = Duration::from_millis(300);
1451        let db_connection = db_pool.connection();
1452        db_connection
1453            .create(CreateRequest {
1454                created_at: sim_clock.now(),
1455                execution_id: execution_id.clone(),
1456                ffqn: FFQN_SOME,
1457                params: Params::empty(),
1458                parent: None,
1459                metadata: concepts::ExecutionMetadata::empty(),
1460                scheduled_at: sim_clock.now(),
1461                retry_exp_backoff: timeout_duration,
1462                max_retries: 1,
1463                component_id: ComponentId::dummy_activity(),
1464                scheduled_by: None,
1465            })
1466            .await
1467            .unwrap();
1468
1469        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1470        let executor = ExecTask::new(
1471            worker,
1472            exec_config,
1473            sim_clock.clone(),
1474            db_exec.clone(),
1475            ffqns,
1476        );
1477        let mut first_execution_progress = executor
1478            .tick(sim_clock.now(), RunId::generate())
1479            .await
1480            .unwrap();
1481        assert_eq!(1, first_execution_progress.executions.len());
1482        // Started hanging, wait for lock expiry.
1483        sim_clock.move_time_forward(lock_expiry);
1484        // cleanup should be called
1485        let now_after_first_lock_expiry = sim_clock.now();
1486        {
1487            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1488            let cleanup_progress = executor
1489                .tick(now_after_first_lock_expiry, RunId::generate())
1490                .await
1491                .unwrap();
1492            assert!(cleanup_progress.executions.is_empty());
1493        }
1494        {
1495            let expired_locks = expired_timers_watcher::tick(
1496                db_pool.connection().as_ref(),
1497                now_after_first_lock_expiry,
1498            )
1499            .await
1500            .unwrap()
1501            .expired_locks;
1502            assert_eq!(1, expired_locks);
1503        }
1504        assert!(
1505            !first_execution_progress
1506                .executions
1507                .pop()
1508                .unwrap()
1509                .1
1510                .is_finished()
1511        );
1512
1513        let execution_log = db_connection.get(&execution_id).await.unwrap();
1514        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1515        assert_matches!(
1516            &execution_log.events.get(2).unwrap(),
1517            ExecutionEvent {
1518                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1519                created_at: at,
1520                backtrace_id: None,
1521            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1522        );
1523        assert_eq!(
1524            PendingState::PendingAt {
1525                scheduled_at: expected_first_timeout_expiry
1526            },
1527            execution_log.pending_state
1528        );
1529        sim_clock.move_time_forward(timeout_duration);
1530        let now_after_first_timeout = sim_clock.now();
1531        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1532
1533        let mut second_execution_progress = executor
1534            .tick(now_after_first_timeout, RunId::generate())
1535            .await
1536            .unwrap();
1537        assert_eq!(1, second_execution_progress.executions.len());
1538
1539        // Started hanging, wait for lock expiry.
1540        sim_clock.move_time_forward(lock_expiry);
1541        // cleanup should be called
1542        let now_after_second_lock_expiry = sim_clock.now();
1543        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1544        {
1545            let cleanup_progress = executor
1546                .tick(now_after_second_lock_expiry, RunId::generate())
1547                .await
1548                .unwrap();
1549            assert!(cleanup_progress.executions.is_empty());
1550        }
1551        {
1552            let expired_locks = expired_timers_watcher::tick(
1553                db_pool.connection().as_ref(),
1554                now_after_second_lock_expiry,
1555            )
1556            .await
1557            .unwrap()
1558            .expired_locks;
1559            assert_eq!(1, expired_locks);
1560        }
1561        assert!(
1562            !second_execution_progress
1563                .executions
1564                .pop()
1565                .unwrap()
1566                .1
1567                .is_finished()
1568        );
1569
1570        drop(db_connection);
1571        drop(executor);
1572        db_close.close().await;
1573    }
1574}