obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7    LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13    FinishedExecutionError,
14    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30    pub lock_expiry: Duration,
31    pub tick_sleep: Duration,
32    pub batch_size: u32,
33    pub component_id: ComponentId,
34    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35    pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39    worker: Arc<dyn Worker>,
40    config: ExecConfig,
41    clock_fn: C, // Used for obtaining current time when the execution finishes.
42    db_pool: Arc<dyn DbPool>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!("Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: Arc<dyn DbPool>,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            clock_fn,
123            db_pool,
124            ffqns,
125        }
126    }
127
128    pub fn spawn_new(
129        worker: Arc<dyn Worker>,
130        config: ExecConfig,
131        clock_fn: C,
132        db_pool: Arc<dyn DbPool>,
133    ) -> ExecutorTaskHandle {
134        let is_closing = Arc::new(AtomicBool::default());
135        let is_closing_inner = is_closing.clone();
136        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137        let component_id = config.component_id.clone();
138        let executor_id = config.executor_id;
139        let abort_handle = tokio::spawn(async move {
140            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141            let task = Self {
142                worker,
143                config,
144                db_pool,
145                ffqns: ffqns.clone(),
146                clock_fn: clock_fn.clone(),
147            };
148            loop {
149                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150                let executed_at = clock_fn.now();
151                task.db_pool
152                    .connection()
153                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154                    .await;
155                if is_closing_inner.load(Ordering::Relaxed) {
156                    return;
157                }
158            }
159        })
160        .abort_handle();
161        ExecutorTaskHandle {
162            is_closing,
163            abort_handle,
164            component_id,
165            executor_id,
166        }
167    }
168
169    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170        if let Some(task_limiter) = &self.config.task_limiter {
171            let mut locks = Vec::new();
172            for _ in 0..self.config.batch_size {
173                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174                    locks.push(Some(permit));
175                } else {
176                    break;
177                }
178            }
179            locks
180        } else {
181            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182            for _ in 0..self.config.batch_size {
183                vec.push(None);
184            }
185            vec
186        }
187    }
188
189    #[cfg(feature = "test")]
190    pub async fn tick_test(
191        &self,
192        executed_at: DateTime<Utc>,
193        run_id: RunId,
194    ) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at, run_id).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199    async fn tick(
200        &self,
201        executed_at: DateTime<Utc>,
202        run_id: RunId,
203    ) -> Result<ExecutionProgress, ()> {
204        let locked_executions = {
205            let mut permits = self.acquire_task_permits();
206            if permits.is_empty() {
207                return Ok(ExecutionProgress::default());
208            }
209            let db_connection = self.db_pool.connection();
210            let lock_expires_at = executed_at + self.config.lock_expiry;
211            let locked_executions = db_connection
212                .lock_pending(
213                    permits.len(), // batch size
214                    executed_at,   // fetch expiring before now
215                    self.ffqns.clone(),
216                    executed_at, // created at
217                    self.config.component_id.clone(),
218                    self.config.executor_id,
219                    lock_expires_at,
220                    run_id,
221                )
222                .await
223                .map_err(|err| {
224                    warn!("lock_pending error {err:?}");
225                })?;
226            // Drop permits if too many were allocated.
227            while permits.len() > locked_executions.len() {
228                permits.pop();
229            }
230            assert_eq!(permits.len(), locked_executions.len());
231            locked_executions.into_iter().zip(permits)
232        };
233        let execution_deadline = executed_at + self.config.lock_expiry;
234
235        let mut executions = Vec::with_capacity(locked_executions.len());
236        for (locked_execution, permit) in locked_executions {
237            let execution_id = locked_execution.execution_id.clone();
238            let join_handle = {
239                let worker = self.worker.clone();
240                let db_pool = self.db_pool.clone();
241                let clock_fn = self.clock_fn.clone();
242                let run_id = locked_execution.run_id;
243                let worker_span = info_span!(parent: None, "worker",
244                    "otel.name" = format!("worker {}", locked_execution.ffqn),
245                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246                locked_execution.metadata.enrich(&worker_span);
247                tokio::spawn({
248                    let worker_span2 = worker_span.clone();
249                    async move {
250                        let _permit = permit;
251                        let res = Self::run_worker(
252                            worker,
253                            db_pool.as_ref(),
254                            execution_deadline,
255                            clock_fn,
256                            locked_execution,
257                            worker_span2,
258                        )
259                        .await;
260                        if let Err(db_error) = res {
261                            error!("Got DbError `{db_error:?}`, expecting watcher to mark execution as timed out");
262                        }
263                    }
264                    .instrument(worker_span)
265                })
266            };
267            executions.push((execution_id, join_handle));
268        }
269        Ok(ExecutionProgress { executions })
270    }
271
272    async fn run_worker(
273        worker: Arc<dyn Worker>,
274        db_pool: &dyn DbPool,
275        execution_deadline: DateTime<Utc>,
276        clock_fn: C,
277        locked_execution: LockedExecution,
278        worker_span: Span,
279    ) -> Result<(), DbError> {
280        debug!("Worker::run starting");
281        trace!(
282            version = %locked_execution.version,
283            params = ?locked_execution.params,
284            event_history = ?locked_execution.event_history,
285            "Worker::run starting"
286        );
287        let can_be_retried = ExecutionLog::can_be_retried_after(
288            locked_execution.temporary_event_count + 1,
289            locked_execution.max_retries,
290            locked_execution.retry_exp_backoff,
291        );
292        let unlock_expiry_on_limit_reached =
293            ExecutionLog::compute_retry_duration_when_retrying_forever(
294                locked_execution.temporary_event_count + 1,
295                locked_execution.retry_exp_backoff,
296            );
297        let ctx = WorkerContext {
298            execution_id: locked_execution.execution_id.clone(),
299            metadata: locked_execution.metadata,
300            ffqn: locked_execution.ffqn,
301            params: locked_execution.params,
302            event_history: locked_execution.event_history,
303            responses: locked_execution
304                .responses
305                .into_iter()
306                .map(|outer| outer.event)
307                .collect(),
308            version: locked_execution.version,
309            execution_deadline,
310            can_be_retried: can_be_retried.is_some(),
311            run_id: locked_execution.run_id,
312            worker_span,
313        };
314        let worker_result = worker.run(ctx).await;
315        trace!(?worker_result, "Worker::run finished");
316        let result_obtained_at = clock_fn.now();
317        match Self::worker_result_to_execution_event(
318            locked_execution.execution_id,
319            worker_result,
320            result_obtained_at,
321            locked_execution.parent,
322            can_be_retried,
323            unlock_expiry_on_limit_reached,
324        )? {
325            Some(append) => {
326                let db_connection = db_pool.connection();
327                trace!("Appending {append:?}");
328                append.append(db_connection.as_ref()).await
329            }
330            None => Ok(()),
331        }
332    }
333
334    /// Map the `WorkerError` to an temporary or a permanent failure.
335    fn worker_result_to_execution_event(
336        execution_id: ExecutionId,
337        worker_result: WorkerResult,
338        result_obtained_at: DateTime<Utc>,
339        parent: Option<(ExecutionId, JoinSetId)>,
340        can_be_retried: Option<Duration>,
341        unlock_expiry_on_limit_reached: Duration,
342    ) -> Result<Option<Append>, DbError> {
343        Ok(match worker_result {
344            WorkerResult::Ok(result, new_version, http_client_traces) => {
345                info!(
346                    "Execution finished: {}",
347                    result.as_pending_state_finished_result()
348                );
349                let child_finished =
350                    parent.map(
351                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
352                            parent_execution_id,
353                            parent_join_set,
354                            result: Ok(result.clone()),
355                        },
356                    );
357                let primary_event = AppendRequest {
358                    created_at: result_obtained_at,
359                    event: ExecutionEventInner::Finished {
360                        result: Ok(result),
361                        http_client_traces,
362                    },
363                };
364
365                Some(Append {
366                    created_at: result_obtained_at,
367                    primary_event,
368                    execution_id,
369                    version: new_version,
370                    child_finished,
371                })
372            }
373            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
374            WorkerResult::Err(err) => {
375                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
376                let (primary_event, child_finished, version) = match err {
377                    WorkerError::TemporaryTimeout {
378                        http_client_traces,
379                        version,
380                    } => {
381                        if let Some(duration) = can_be_retried {
382                            let backoff_expires_at = result_obtained_at + duration;
383                            info!(
384                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
385                            );
386                            (
387                                ExecutionEventInner::TemporarilyTimedOut {
388                                    backoff_expires_at,
389                                    http_client_traces,
390                                },
391                                None,
392                                version,
393                            )
394                        } else {
395                            info!("Permanent timeout");
396                            let result = Err(FinishedExecutionError::PermanentTimeout);
397                            let child_finished =
398                                parent.map(|(parent_execution_id, parent_join_set)| {
399                                    ChildFinishedResponse {
400                                        parent_execution_id,
401                                        parent_join_set,
402                                        result: result.clone(),
403                                    }
404                                });
405                            (
406                                ExecutionEventInner::Finished {
407                                    result,
408                                    http_client_traces,
409                                },
410                                child_finished,
411                                version,
412                            )
413                        }
414                    }
415                    WorkerError::DbError(db_error) => {
416                        return Err(db_error);
417                    }
418                    WorkerError::ActivityTrap {
419                        reason: reason_inner,
420                        trap_kind,
421                        detail,
422                        version,
423                        http_client_traces,
424                    } => retry_or_fail(
425                        result_obtained_at,
426                        parent,
427                        can_be_retried,
428                        reason_full,
429                        reason_inner,
430                        trap_kind, // short reason like `trap` for logs
431                        detail,
432                        version,
433                        http_client_traces,
434                    ),
435                    WorkerError::ActivityPreopenedDirError {
436                        reason_kind,
437                        reason_inner,
438                        version,
439                    } => retry_or_fail(
440                        result_obtained_at,
441                        parent,
442                        can_be_retried,
443                        reason_full,
444                        reason_inner,
445                        reason_kind, // short reason for logs
446                        None,        // detail
447                        version,
448                        None, // http_client_traces
449                    ),
450                    WorkerError::ActivityReturnedError {
451                        detail,
452                        version,
453                        http_client_traces,
454                    } => {
455                        let duration = can_be_retried.expect(
456                            "ActivityReturnedError must not be returned when retries are exhausted",
457                        );
458                        let expires_at = result_obtained_at + duration;
459                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
460                        (
461                            ExecutionEventInner::TemporarilyFailed {
462                                backoff_expires_at: expires_at,
463                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
464                                reason_inner: StrVariant::Static("activity returned error"),
465                                detail, // contains the backtrace
466                                http_client_traces,
467                            },
468                            None,
469                            version,
470                        )
471                    }
472                    WorkerError::LimitReached {
473                        reason: inner_reason,
474                        version: new_version,
475                    } => {
476                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
477                        warn!(
478                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
479                        );
480                        (
481                            ExecutionEventInner::Unlocked {
482                                backoff_expires_at: expires_at,
483                                reason: StrVariant::from(reason_full),
484                            },
485                            None,
486                            new_version,
487                        )
488                    }
489                    WorkerError::FatalError(fatal_error, version) => {
490                        info!("Fatal worker error - {fatal_error:?}");
491                        let result = Err(FinishedExecutionError::from(fatal_error));
492                        let child_finished =
493                            parent.map(|(parent_execution_id, parent_join_set)| {
494                                ChildFinishedResponse {
495                                    parent_execution_id,
496                                    parent_join_set,
497                                    result: result.clone(),
498                                }
499                            });
500                        (
501                            ExecutionEventInner::Finished {
502                                result,
503                                http_client_traces: None,
504                            },
505                            child_finished,
506                            version,
507                        )
508                    }
509                };
510                Some(Append {
511                    created_at: result_obtained_at,
512                    primary_event: AppendRequest {
513                        created_at: result_obtained_at,
514                        event: primary_event,
515                    },
516                    execution_id,
517                    version,
518                    child_finished,
519                })
520            }
521        })
522    }
523}
524
525#[expect(clippy::too_many_arguments)]
526fn retry_or_fail(
527    result_obtained_at: DateTime<Utc>,
528    parent: Option<(ExecutionId, JoinSetId)>,
529    can_be_retried: Option<Duration>,
530    reason_full: String,
531    reason_inner: String,
532    err_kind_for_logs: impl Display,
533    detail: Option<String>,
534    version: Version,
535    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
536) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
537    if let Some(duration) = can_be_retried {
538        let expires_at = result_obtained_at + duration;
539        debug!(
540            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
541        );
542        (
543            ExecutionEventInner::TemporarilyFailed {
544                backoff_expires_at: expires_at,
545                reason_full: StrVariant::from(reason_full),
546                reason_inner: StrVariant::from(reason_inner),
547                detail,
548                http_client_traces,
549            },
550            None,
551            version,
552        )
553    } else {
554        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
555        let result = Err(FinishedExecutionError::PermanentFailure {
556            reason_inner,
557            reason_full,
558            kind: PermanentFailureKind::ActivityTrap,
559            detail,
560        });
561        let child_finished =
562            parent.map(
563                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
564                    parent_execution_id,
565                    parent_join_set,
566                    result: result.clone(),
567                },
568            );
569        (
570            ExecutionEventInner::Finished {
571                result,
572                http_client_traces,
573            },
574            child_finished,
575            version,
576        )
577    }
578}
579
580#[derive(Debug, Clone)]
581pub(crate) struct ChildFinishedResponse {
582    pub(crate) parent_execution_id: ExecutionId,
583    pub(crate) parent_join_set: JoinSetId,
584    pub(crate) result: FinishedExecutionResult,
585}
586
587#[derive(Debug, Clone)]
588pub(crate) struct Append {
589    pub(crate) created_at: DateTime<Utc>,
590    pub(crate) primary_event: AppendRequest,
591    pub(crate) execution_id: ExecutionId,
592    pub(crate) version: Version,
593    pub(crate) child_finished: Option<ChildFinishedResponse>,
594}
595
596impl Append {
597    pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
598        if let Some(child_finished) = self.child_finished {
599            assert_matches!(
600                &self.primary_event,
601                AppendRequest {
602                    event: ExecutionEventInner::Finished { .. },
603                    ..
604                }
605            );
606            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
607            db_connection
608                .append_batch_respond_to_parent(
609                    derived.clone(),
610                    self.created_at,
611                    vec![self.primary_event],
612                    self.version.clone(),
613                    child_finished.parent_execution_id,
614                    JoinSetResponseEventOuter {
615                        created_at: self.created_at,
616                        event: JoinSetResponseEvent {
617                            join_set_id: child_finished.parent_join_set,
618                            event: JoinSetResponse::ChildExecutionFinished {
619                                child_execution_id: derived,
620                                // Since self.primary_event is a finished event, the version will remain the same.
621                                finished_version: self.version,
622                                result: child_finished.result,
623                            },
624                        },
625                    },
626                )
627                .await?;
628        } else {
629            db_connection
630                .append_batch(
631                    self.created_at,
632                    vec![self.primary_event],
633                    self.execution_id,
634                    self.version,
635                )
636                .await?;
637        }
638        Ok(())
639    }
640}
641
642#[cfg(any(test, feature = "test"))]
643pub mod simple_worker {
644    use crate::worker::{Worker, WorkerContext, WorkerResult};
645    use async_trait::async_trait;
646    use concepts::{
647        FunctionFqn, FunctionMetadata, ParameterTypes,
648        storage::{HistoryEvent, Version},
649    };
650    use indexmap::IndexMap;
651    use std::sync::Arc;
652    use tracing::trace;
653
654    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
655    pub type SimpleWorkerResultMap =
656        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
657
658    #[derive(Clone, Debug)]
659    pub struct SimpleWorker {
660        pub worker_results_rev: SimpleWorkerResultMap,
661        pub ffqn: FunctionFqn,
662        exported: [FunctionMetadata; 1],
663    }
664
665    impl SimpleWorker {
666        #[must_use]
667        pub fn with_single_result(res: WorkerResult) -> Self {
668            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
669                Version::new(2),
670                (vec![], res),
671            )]))))
672        }
673
674        #[must_use]
675        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
676            Self {
677                worker_results_rev: self.worker_results_rev,
678                exported: [FunctionMetadata {
679                    ffqn: ffqn.clone(),
680                    parameter_types: ParameterTypes::default(),
681                    return_type: None,
682                    extension: None,
683                    submittable: true,
684                }],
685                ffqn,
686            }
687        }
688
689        #[must_use]
690        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
691            Self {
692                worker_results_rev,
693                ffqn: FFQN_SOME,
694                exported: [FunctionMetadata {
695                    ffqn: FFQN_SOME,
696                    parameter_types: ParameterTypes::default(),
697                    return_type: None,
698                    extension: None,
699                    submittable: true,
700                }],
701            }
702        }
703    }
704
705    #[async_trait]
706    impl Worker for SimpleWorker {
707        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
708            let (expected_version, (expected_eh, worker_result)) =
709                self.worker_results_rev.lock().unwrap().pop().unwrap();
710            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
711            assert_eq!(expected_version, ctx.version);
712            assert_eq!(expected_eh, ctx.event_history);
713            worker_result
714        }
715
716        fn exported_functions(&self) -> &[FunctionMetadata] {
717            &self.exported
718        }
719    }
720}
721
722#[cfg(test)]
723mod tests {
724    use self::simple_worker::SimpleWorker;
725    use super::*;
726    use crate::worker::FatalError;
727    use crate::{expired_timers_watcher, worker::WorkerResult};
728    use assert_matches::assert_matches;
729    use async_trait::async_trait;
730    use concepts::storage::{CreateRequest, JoinSetRequest};
731    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
732    use concepts::time::Now;
733    use concepts::{
734        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
735        SupportedFunctionReturnValue, TrapKind,
736    };
737    use db_tests::Database;
738    use indexmap::IndexMap;
739    use simple_worker::FFQN_SOME;
740    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
741    use test_utils::set_up;
742    use test_utils::sim_clock::{ConstClock, SimClock};
743
744    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
745
746    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
747        config: ExecConfig,
748        clock_fn: C,
749        db_pool: Arc<dyn DbPool>,
750        worker: Arc<W>,
751        executed_at: DateTime<Utc>,
752    ) -> ExecutionProgress {
753        trace!("Ticking with {worker:?}");
754        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
755        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
756        let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
757        loop {
758            execution_progress
759                .executions
760                .retain(|(_, abort_handle)| !abort_handle.is_finished());
761            if execution_progress.executions.is_empty() {
762                return execution_progress;
763            }
764            tokio::time::sleep(Duration::from_millis(10)).await;
765        }
766    }
767
768    #[tokio::test]
769    async fn execute_simple_lifecycle_tick_based_mem() {
770        let created_at = Now.now();
771        let (_guard, db_pool) = Database::Memory.set_up().await;
772        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
773        db_pool.close().await.unwrap();
774    }
775
776    #[tokio::test]
777    async fn execute_simple_lifecycle_tick_based_sqlite() {
778        let created_at = Now.now();
779        let (_guard, db_pool) = Database::Sqlite.set_up().await;
780        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
781        db_pool.close().await.unwrap();
782    }
783
784    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
785        pool: Arc<dyn DbPool>,
786        clock_fn: C,
787    ) {
788        set_up();
789        let created_at = clock_fn.now();
790        let exec_config = ExecConfig {
791            batch_size: 1,
792            lock_expiry: Duration::from_secs(1),
793            tick_sleep: Duration::from_millis(100),
794            component_id: ComponentId::dummy_activity(),
795            task_limiter: None,
796            executor_id: ExecutorId::generate(),
797        };
798
799        let execution_log = create_and_tick(
800            CreateAndTickConfig {
801                execution_id: ExecutionId::generate(),
802                created_at,
803                max_retries: 0,
804                executed_at: created_at,
805                retry_exp_backoff: Duration::ZERO,
806            },
807            clock_fn,
808            pool,
809            exec_config,
810            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
811                SupportedFunctionReturnValue::None,
812                Version::new(2),
813                None,
814            ))),
815            tick_fn,
816        )
817        .await;
818        assert_matches!(
819            execution_log.events.get(2).unwrap(),
820            ExecutionEvent {
821                event: ExecutionEventInner::Finished {
822                    result: Ok(SupportedFunctionReturnValue::None),
823                    http_client_traces: None
824                },
825                created_at: _,
826                backtrace_id: None,
827            }
828        );
829    }
830
831    #[tokio::test]
832    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
833        set_up();
834        let created_at = Now.now();
835        let clock_fn = ConstClock(created_at);
836        let (_guard, db_pool) = Database::Memory.set_up().await;
837        let exec_config = ExecConfig {
838            batch_size: 1,
839            lock_expiry: Duration::from_secs(1),
840            tick_sleep: Duration::ZERO,
841            component_id: ComponentId::dummy_activity(),
842            task_limiter: None,
843            executor_id: ExecutorId::generate(),
844        };
845
846        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
847            SupportedFunctionReturnValue::None,
848            Version::new(2),
849            None,
850        )));
851        let exec_task = ExecTask::spawn_new(
852            worker.clone(),
853            exec_config.clone(),
854            clock_fn,
855            db_pool.clone(),
856        );
857
858        let execution_log = create_and_tick(
859            CreateAndTickConfig {
860                execution_id: ExecutionId::generate(),
861                created_at,
862                max_retries: 0,
863                executed_at: created_at,
864                retry_exp_backoff: Duration::ZERO,
865            },
866            clock_fn,
867            db_pool.clone(),
868            exec_config,
869            worker,
870            |_, _, _, _, _| async {
871                tokio::time::sleep(Duration::from_secs(1)).await; // FIXME: non determinism, possible race
872                ExecutionProgress::default()
873            },
874        )
875        .await;
876        exec_task.close().await;
877        db_pool.close().await.unwrap();
878        assert_matches!(
879            execution_log.events.get(2).unwrap(),
880            ExecutionEvent {
881                event: ExecutionEventInner::Finished {
882                    result: Ok(SupportedFunctionReturnValue::None),
883                    http_client_traces: None
884                },
885                created_at: _,
886                backtrace_id: None,
887            }
888        );
889    }
890
891    struct CreateAndTickConfig {
892        execution_id: ExecutionId,
893        created_at: DateTime<Utc>,
894        max_retries: u32,
895        executed_at: DateTime<Utc>,
896        retry_exp_backoff: Duration,
897    }
898
899    async fn create_and_tick<
900        W: Worker,
901        C: ClockFn,
902        T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
903        F: Future<Output = ExecutionProgress>,
904    >(
905        config: CreateAndTickConfig,
906        clock_fn: C,
907        db_pool: Arc<dyn DbPool>,
908        exec_config: ExecConfig,
909        worker: Arc<W>,
910        mut tick: T,
911    ) -> ExecutionLog {
912        // Create an execution
913        let db_connection = db_pool.connection();
914        db_connection
915            .create(CreateRequest {
916                created_at: config.created_at,
917                execution_id: config.execution_id.clone(),
918                ffqn: FFQN_SOME,
919                params: Params::empty(),
920                parent: None,
921                metadata: concepts::ExecutionMetadata::empty(),
922                scheduled_at: config.created_at,
923                retry_exp_backoff: config.retry_exp_backoff,
924                max_retries: config.max_retries,
925                component_id: ComponentId::dummy_activity(),
926                scheduled_by: None,
927            })
928            .await
929            .unwrap();
930        // execute!
931        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
932        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
933        debug!("Execution history after tick: {execution_log:?}");
934        // check that DB contains Created and Locked events.
935        assert_matches!(
936            execution_log.events.first().unwrap(),
937            ExecutionEvent {
938                event: ExecutionEventInner::Created { .. },
939                created_at: actually_created_at,
940                backtrace_id: None,
941            }
942            if config.created_at == *actually_created_at
943        );
944        let locked_at = assert_matches!(
945            execution_log.events.get(1).unwrap(),
946            ExecutionEvent {
947                event: ExecutionEventInner::Locked { .. },
948                created_at: locked_at,
949                backtrace_id: None,
950            } if config.created_at <= *locked_at
951            => *locked_at
952        );
953        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
954            event: _,
955            created_at: executed_at,
956            backtrace_id: None,
957        } if *executed_at >= locked_at);
958        execution_log
959    }
960
961    #[tokio::test]
962    async fn activity_trap_should_trigger_an_execution_retry() {
963        set_up();
964        let sim_clock = SimClock::default();
965        let (_guard, db_pool) = Database::Memory.set_up().await;
966        let exec_config = ExecConfig {
967            batch_size: 1,
968            lock_expiry: Duration::from_secs(1),
969            tick_sleep: Duration::ZERO,
970            component_id: ComponentId::dummy_activity(),
971            task_limiter: None,
972            executor_id: ExecutorId::generate(),
973        };
974        let expected_reason = "error reason";
975        let expected_detail = "error detail";
976        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
977            WorkerError::ActivityTrap {
978                reason: expected_reason.to_string(),
979                trap_kind: concepts::TrapKind::Trap,
980                detail: Some(expected_detail.to_string()),
981                version: Version::new(2),
982                http_client_traces: None,
983            },
984        )));
985        let retry_exp_backoff = Duration::from_millis(100);
986        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
987        let execution_log = create_and_tick(
988            CreateAndTickConfig {
989                execution_id: ExecutionId::generate(),
990                created_at: sim_clock.now(),
991                max_retries: 1,
992                executed_at: sim_clock.now(),
993                retry_exp_backoff,
994            },
995            sim_clock.clone(),
996            db_pool.clone(),
997            exec_config.clone(),
998            worker,
999            tick_fn,
1000        )
1001        .await;
1002        assert_eq!(3, execution_log.events.len());
1003        {
1004            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1005                &execution_log.events.get(2).unwrap(),
1006                ExecutionEvent {
1007                    event: ExecutionEventInner::TemporarilyFailed {
1008                        reason_inner,
1009                        reason_full,
1010                        detail,
1011                        backoff_expires_at,
1012                        http_client_traces: None,
1013                    },
1014                    created_at: at,
1015                    backtrace_id: None,
1016                }
1017                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1018            );
1019            assert_eq!(expected_reason, reason_inner.deref());
1020            assert_eq!(
1021                format!("activity trap: {expected_reason}"),
1022                reason_full.deref()
1023            );
1024            assert_eq!(Some(expected_detail), detail.as_deref());
1025            assert_eq!(at, sim_clock.now());
1026            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1027        }
1028        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1029            std::sync::Mutex::new(IndexMap::from([(
1030                Version::new(4),
1031                (
1032                    vec![],
1033                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1034                ),
1035            )])),
1036        )));
1037        // noop until `retry_exp_backoff` expires
1038        assert!(
1039            tick_fn(
1040                exec_config.clone(),
1041                sim_clock.clone(),
1042                db_pool.clone(),
1043                worker.clone(),
1044                sim_clock.now(),
1045            )
1046            .await
1047            .executions
1048            .is_empty()
1049        );
1050        // tick again to finish the execution
1051        sim_clock.move_time_forward(retry_exp_backoff);
1052        tick_fn(
1053            exec_config,
1054            sim_clock.clone(),
1055            db_pool.clone(),
1056            worker,
1057            sim_clock.now(),
1058        )
1059        .await;
1060        let execution_log = {
1061            let db_connection = db_pool.connection();
1062            db_connection
1063                .get(&execution_log.execution_id)
1064                .await
1065                .unwrap()
1066        };
1067        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1068        assert_matches!(
1069            execution_log.events.get(3).unwrap(),
1070            ExecutionEvent {
1071                event: ExecutionEventInner::Locked { .. },
1072                created_at: at,
1073                backtrace_id: None,
1074            } if *at == sim_clock.now()
1075        );
1076        assert_matches!(
1077            execution_log.events.get(4).unwrap(),
1078            ExecutionEvent {
1079                event: ExecutionEventInner::Finished {
1080                    result: Ok(SupportedFunctionReturnValue::None),
1081                    http_client_traces: None
1082                },
1083                created_at: finished_at,
1084                backtrace_id: None,
1085            } if *finished_at == sim_clock.now()
1086        );
1087        db_pool.close().await.unwrap();
1088    }
1089
1090    #[tokio::test]
1091    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1092        set_up();
1093        let created_at = Now.now();
1094        let clock_fn = ConstClock(created_at);
1095        let (_guard, db_pool) = Database::Memory.set_up().await;
1096        let exec_config = ExecConfig {
1097            batch_size: 1,
1098            lock_expiry: Duration::from_secs(1),
1099            tick_sleep: Duration::ZERO,
1100            component_id: ComponentId::dummy_activity(),
1101            task_limiter: None,
1102            executor_id: ExecutorId::generate(),
1103        };
1104
1105        let expected_reason = "error reason";
1106        let expected_detail = "error detail";
1107        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1108            WorkerError::ActivityTrap {
1109                reason: expected_reason.to_string(),
1110                trap_kind: concepts::TrapKind::Trap,
1111                detail: Some(expected_detail.to_string()),
1112                version: Version::new(2),
1113                http_client_traces: None,
1114            },
1115        )));
1116        let execution_log = create_and_tick(
1117            CreateAndTickConfig {
1118                execution_id: ExecutionId::generate(),
1119                created_at,
1120                max_retries: 0,
1121                executed_at: created_at,
1122                retry_exp_backoff: Duration::ZERO,
1123            },
1124            clock_fn,
1125            db_pool.clone(),
1126            exec_config.clone(),
1127            worker,
1128            tick_fn,
1129        )
1130        .await;
1131        assert_eq!(3, execution_log.events.len());
1132        let (reason, kind, detail) = assert_matches!(
1133            &execution_log.events.get(2).unwrap(),
1134            ExecutionEvent {
1135                event: ExecutionEventInner::Finished{
1136                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1137                    http_client_traces: None
1138                },
1139                created_at: at,
1140                backtrace_id: None,
1141            } if *at == created_at
1142            => (reason_inner, kind, detail)
1143        );
1144        assert_eq!(expected_reason, *reason);
1145        assert_eq!(Some(expected_detail), detail.as_deref());
1146        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1147
1148        db_pool.close().await.unwrap();
1149    }
1150
1151    #[tokio::test]
1152    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1153        let worker_error = WorkerError::ActivityTrap {
1154            reason: "error reason".to_string(),
1155            trap_kind: TrapKind::Trap,
1156            detail: Some("detail".to_string()),
1157            version: Version::new(2),
1158            http_client_traces: None,
1159        };
1160        let expected_child_err = FinishedExecutionError::PermanentFailure {
1161            reason_full: "activity trap: error reason".to_string(),
1162            reason_inner: "error reason".to_string(),
1163            kind: PermanentFailureKind::ActivityTrap,
1164            detail: Some("detail".to_string()),
1165        };
1166        child_execution_permanently_failed_should_notify_parent(
1167            WorkerResult::Err(worker_error),
1168            expected_child_err,
1169        )
1170        .await;
1171    }
1172
1173    // TODO: Add test for WorkerError::TemporaryTimeout
1174
1175    #[tokio::test]
1176    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1177        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1178        child_execution_permanently_failed_should_notify_parent(
1179            WorkerResult::DbUpdatedByWorkerOrWatcher,
1180            expected_child_err,
1181        )
1182        .await;
1183    }
1184
1185    #[tokio::test]
1186    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1187        let parent_id = ExecutionId::from_parts(1, 1);
1188        let join_set_id_outer =
1189            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1190        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1191        let join_set_id_inner =
1192            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1193        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1194        let worker_error = WorkerError::FatalError(
1195            FatalError::UnhandledChildExecutionError {
1196                child_execution_id: child_execution_id.clone(),
1197                root_cause_id: root_cause_id.clone(),
1198            },
1199            Version::new(2),
1200        );
1201        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1202            child_execution_id,
1203            root_cause_id,
1204        };
1205        child_execution_permanently_failed_should_notify_parent(
1206            WorkerResult::Err(worker_error),
1207            expected_child_err,
1208        )
1209        .await;
1210    }
1211
1212    async fn child_execution_permanently_failed_should_notify_parent(
1213        worker_result: WorkerResult,
1214        expected_child_err: FinishedExecutionError,
1215    ) {
1216        use concepts::storage::JoinSetResponseEventOuter;
1217        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1218
1219        set_up();
1220        let sim_clock = SimClock::default();
1221        let (_guard, db_pool) = Database::Memory.set_up().await;
1222
1223        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1224            WorkerResult::DbUpdatedByWorkerOrWatcher,
1225        ));
1226        let parent_execution_id = ExecutionId::generate();
1227        db_pool
1228            .connection()
1229            .create(CreateRequest {
1230                created_at: sim_clock.now(),
1231                execution_id: parent_execution_id.clone(),
1232                ffqn: FFQN_SOME,
1233                params: Params::empty(),
1234                parent: None,
1235                metadata: concepts::ExecutionMetadata::empty(),
1236                scheduled_at: sim_clock.now(),
1237                retry_exp_backoff: Duration::ZERO,
1238                max_retries: 0,
1239                component_id: ComponentId::dummy_activity(),
1240                scheduled_by: None,
1241            })
1242            .await
1243            .unwrap();
1244        tick_fn(
1245            ExecConfig {
1246                batch_size: 1,
1247                lock_expiry: LOCK_EXPIRY,
1248                tick_sleep: Duration::ZERO,
1249                component_id: ComponentId::dummy_activity(),
1250                task_limiter: None,
1251                executor_id: ExecutorId::generate(),
1252            },
1253            sim_clock.clone(),
1254            db_pool.clone(),
1255            parent_worker,
1256            sim_clock.now(),
1257        )
1258        .await;
1259
1260        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1261        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1262        // executor does not append anything, this should have been written by the worker:
1263        {
1264            let child = CreateRequest {
1265                created_at: sim_clock.now(),
1266                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1267                ffqn: FFQN_CHILD,
1268                params: Params::empty(),
1269                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1270                metadata: concepts::ExecutionMetadata::empty(),
1271                scheduled_at: sim_clock.now(),
1272                retry_exp_backoff: Duration::ZERO,
1273                max_retries: 0,
1274                component_id: ComponentId::dummy_activity(),
1275                scheduled_by: None,
1276            };
1277            let current_time = sim_clock.now();
1278            let join_set = AppendRequest {
1279                created_at: current_time,
1280                event: ExecutionEventInner::HistoryEvent {
1281                    event: HistoryEvent::JoinSetCreate {
1282                        join_set_id: join_set_id.clone(),
1283                        closing_strategy: ClosingStrategy::Complete,
1284                    },
1285                },
1286            };
1287            let child_exec_req = AppendRequest {
1288                created_at: current_time,
1289                event: ExecutionEventInner::HistoryEvent {
1290                    event: HistoryEvent::JoinSetRequest {
1291                        join_set_id: join_set_id.clone(),
1292                        request: JoinSetRequest::ChildExecutionRequest {
1293                            child_execution_id: child_execution_id.clone(),
1294                        },
1295                    },
1296                },
1297            };
1298            let join_next = AppendRequest {
1299                created_at: current_time,
1300                event: ExecutionEventInner::HistoryEvent {
1301                    event: HistoryEvent::JoinNext {
1302                        join_set_id: join_set_id.clone(),
1303                        run_expires_at: sim_clock.now(),
1304                        closing: false,
1305                        requested_ffqn: Some(FFQN_CHILD),
1306                    },
1307                },
1308            };
1309            db_pool
1310                .connection()
1311                .append_batch_create_new_execution(
1312                    current_time,
1313                    vec![join_set, child_exec_req, join_next],
1314                    parent_execution_id.clone(),
1315                    Version::new(2),
1316                    vec![child],
1317                )
1318                .await
1319                .unwrap();
1320        }
1321
1322        let child_worker =
1323            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1324
1325        // execute the child
1326        tick_fn(
1327            ExecConfig {
1328                batch_size: 1,
1329                lock_expiry: LOCK_EXPIRY,
1330                tick_sleep: Duration::ZERO,
1331                component_id: ComponentId::dummy_activity(),
1332                task_limiter: None,
1333                executor_id: ExecutorId::generate(),
1334            },
1335            sim_clock.clone(),
1336            db_pool.clone(),
1337            child_worker,
1338            sim_clock.now(),
1339        )
1340        .await;
1341        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1342            // In case of timeout, let the timers watcher handle it
1343            sim_clock.move_time_forward(LOCK_EXPIRY);
1344            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1345                .await
1346                .unwrap();
1347        }
1348        let child_log = db_pool
1349            .connection()
1350            .get(&ExecutionId::Derived(child_execution_id.clone()))
1351            .await
1352            .unwrap();
1353        assert!(child_log.pending_state.is_finished());
1354        assert_eq!(
1355            Version(2),
1356            child_log.next_version,
1357            "created = 0, locked = 1, with_single_result = 2"
1358        );
1359        assert_eq!(
1360            ExecutionEventInner::Finished {
1361                result: Err(expected_child_err),
1362                http_client_traces: None
1363            },
1364            child_log.last_event().event
1365        );
1366        let parent_log = db_pool
1367            .connection()
1368            .get(&parent_execution_id)
1369            .await
1370            .unwrap();
1371        assert_matches!(
1372            parent_log.pending_state,
1373            PendingState::PendingAt {
1374                scheduled_at
1375            } if scheduled_at == sim_clock.now(),
1376            "parent should be back to pending"
1377        );
1378        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1379            parent_log.responses.last(),
1380            Some(JoinSetResponseEventOuter{
1381                created_at: at,
1382                event: JoinSetResponseEvent{
1383                    join_set_id: found_join_set_id,
1384                    event: JoinSetResponse::ChildExecutionFinished {
1385                        child_execution_id: found_child_execution_id,
1386                        finished_version,
1387                        result: found_result,
1388                    }
1389                }
1390            })
1391             if *at == sim_clock.now()
1392            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1393        );
1394        assert_eq!(join_set_id, *found_join_set_id);
1395        assert_eq!(child_execution_id, *found_child_execution_id);
1396        assert_eq!(child_log.next_version, *child_finished_version);
1397        assert!(found_result.is_err());
1398
1399        db_pool.close().await.unwrap();
1400    }
1401
1402    #[derive(Clone, Debug)]
1403    struct SleepyWorker {
1404        duration: Duration,
1405        result: SupportedFunctionReturnValue,
1406        exported: [FunctionMetadata; 1],
1407    }
1408
1409    #[async_trait]
1410    impl Worker for SleepyWorker {
1411        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1412            tokio::time::sleep(self.duration).await;
1413            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1414        }
1415
1416        fn exported_functions(&self) -> &[FunctionMetadata] {
1417            &self.exported
1418        }
1419    }
1420
1421    #[tokio::test]
1422    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1423        set_up();
1424        let sim_clock = SimClock::default();
1425        let (_guard, db_pool) = Database::Memory.set_up().await;
1426        let lock_expiry = Duration::from_millis(100);
1427        let exec_config = ExecConfig {
1428            batch_size: 1,
1429            lock_expiry,
1430            tick_sleep: Duration::ZERO,
1431            component_id: ComponentId::dummy_activity(),
1432            task_limiter: None,
1433            executor_id: ExecutorId::generate(),
1434        };
1435
1436        let worker = Arc::new(SleepyWorker {
1437            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1438            result: SupportedFunctionReturnValue::None,
1439            exported: [FunctionMetadata {
1440                ffqn: FFQN_SOME,
1441                parameter_types: ParameterTypes::default(),
1442                return_type: None,
1443                extension: None,
1444                submittable: true,
1445            }],
1446        });
1447        // Create an execution
1448        let execution_id = ExecutionId::generate();
1449        let timeout_duration = Duration::from_millis(300);
1450        let db_connection = db_pool.connection();
1451        db_connection
1452            .create(CreateRequest {
1453                created_at: sim_clock.now(),
1454                execution_id: execution_id.clone(),
1455                ffqn: FFQN_SOME,
1456                params: Params::empty(),
1457                parent: None,
1458                metadata: concepts::ExecutionMetadata::empty(),
1459                scheduled_at: sim_clock.now(),
1460                retry_exp_backoff: timeout_duration,
1461                max_retries: 1,
1462                component_id: ComponentId::dummy_activity(),
1463                scheduled_by: None,
1464            })
1465            .await
1466            .unwrap();
1467
1468        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1469        let executor = ExecTask::new(
1470            worker,
1471            exec_config,
1472            sim_clock.clone(),
1473            db_pool.clone(),
1474            ffqns,
1475        );
1476        let mut first_execution_progress = executor
1477            .tick(sim_clock.now(), RunId::generate())
1478            .await
1479            .unwrap();
1480        assert_eq!(1, first_execution_progress.executions.len());
1481        // Started hanging, wait for lock expiry.
1482        sim_clock.move_time_forward(lock_expiry);
1483        // cleanup should be called
1484        let now_after_first_lock_expiry = sim_clock.now();
1485        {
1486            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1487            let cleanup_progress = executor
1488                .tick(now_after_first_lock_expiry, RunId::generate())
1489                .await
1490                .unwrap();
1491            assert!(cleanup_progress.executions.is_empty());
1492        }
1493        {
1494            let expired_locks = expired_timers_watcher::tick(
1495                db_pool.connection().as_ref(),
1496                now_after_first_lock_expiry,
1497            )
1498            .await
1499            .unwrap()
1500            .expired_locks;
1501            assert_eq!(1, expired_locks);
1502        }
1503        assert!(
1504            !first_execution_progress
1505                .executions
1506                .pop()
1507                .unwrap()
1508                .1
1509                .is_finished()
1510        );
1511
1512        let execution_log = db_connection.get(&execution_id).await.unwrap();
1513        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1514        assert_matches!(
1515            &execution_log.events.get(2).unwrap(),
1516            ExecutionEvent {
1517                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1518                created_at: at,
1519                backtrace_id: None,
1520            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1521        );
1522        assert_eq!(
1523            PendingState::PendingAt {
1524                scheduled_at: expected_first_timeout_expiry
1525            },
1526            execution_log.pending_state
1527        );
1528        sim_clock.move_time_forward(timeout_duration);
1529        let now_after_first_timeout = sim_clock.now();
1530        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1531
1532        let mut second_execution_progress = executor
1533            .tick(now_after_first_timeout, RunId::generate())
1534            .await
1535            .unwrap();
1536        assert_eq!(1, second_execution_progress.executions.len());
1537
1538        // Started hanging, wait for lock expiry.
1539        sim_clock.move_time_forward(lock_expiry);
1540        // cleanup should be called
1541        let now_after_second_lock_expiry = sim_clock.now();
1542        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1543        {
1544            let cleanup_progress = executor
1545                .tick(now_after_second_lock_expiry, RunId::generate())
1546                .await
1547                .unwrap();
1548            assert!(cleanup_progress.executions.is_empty());
1549        }
1550        {
1551            let expired_locks = expired_timers_watcher::tick(
1552                db_pool.connection().as_ref(),
1553                now_after_second_lock_expiry,
1554            )
1555            .await
1556            .unwrap()
1557            .expired_locks;
1558            assert_eq!(1, expired_locks);
1559        }
1560        assert!(
1561            !second_execution_progress
1562                .executions
1563                .pop()
1564                .unwrap()
1565                .1
1566                .is_finished()
1567        );
1568
1569        drop(db_connection);
1570        drop(executor);
1571        db_pool.close().await.unwrap();
1572    }
1573}