obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12    FinishedExecutionError,
13    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29    pub lock_expiry: Duration,
30    pub tick_sleep: Duration,
31    pub batch_size: u32,
32    pub component_id: ComponentId,
33    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37    worker: Arc<dyn Worker>,
38    config: ExecConfig,
39    clock_fn: C, // Used for obtaining current time when the execution finishes.
40    db_pool: P,
41    executor_id: ExecutorId,
42    phantom_data: PhantomData<DB>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: P,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            executor_id: ExecutorId::generate(),
123            db_pool,
124            phantom_data: PhantomData,
125            ffqns,
126            clock_fn,
127        }
128    }
129
130    pub fn spawn_new(
131        worker: Arc<dyn Worker>,
132        config: ExecConfig,
133        clock_fn: C,
134        db_pool: P,
135        executor_id: ExecutorId,
136    ) -> ExecutorTaskHandle {
137        let is_closing = Arc::new(AtomicBool::default());
138        let is_closing_inner = is_closing.clone();
139        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
140        let component_id = config.component_id.clone();
141        let abort_handle = tokio::spawn(async move {
142            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143            let task = Self {
144                worker,
145                config,
146                executor_id,
147                db_pool,
148                phantom_data: PhantomData,
149                ffqns: ffqns.clone(),
150                clock_fn: clock_fn.clone(),
151            };
152            loop {
153                let _ = task.tick(clock_fn.now()).await;
154                let executed_at = clock_fn.now();
155                task.db_pool
156                    .connection()
157                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158                    .await;
159                if is_closing_inner.load(Ordering::Relaxed) {
160                    return;
161                }
162            }
163        })
164        .abort_handle();
165        ExecutorTaskHandle {
166            is_closing,
167            abort_handle,
168            component_id,
169            executor_id,
170        }
171    }
172
173    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174        if let Some(task_limiter) = &self.config.task_limiter {
175            let mut locks = Vec::new();
176            for _ in 0..self.config.batch_size {
177                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178                    locks.push(Some(permit));
179                } else {
180                    break;
181                }
182            }
183            locks
184        } else {
185            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186            for _ in 0..self.config.batch_size {
187                vec.push(None);
188            }
189            vec
190        }
191    }
192
193    #[cfg(feature = "test")]
194    pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200        let locked_executions = {
201            let mut permits = self.acquire_task_permits();
202            if permits.is_empty() {
203                return Ok(ExecutionProgress::default());
204            }
205            let db_connection = self.db_pool.connection();
206            let lock_expires_at = executed_at + self.config.lock_expiry;
207            let locked_executions = db_connection
208                .lock_pending(
209                    permits.len(), // batch size
210                    executed_at,   // fetch expiring before now
211                    self.ffqns.clone(),
212                    executed_at, // created at
213                    self.config.component_id.clone(),
214                    self.executor_id,
215                    lock_expires_at,
216                )
217                .await
218                .map_err(|err| {
219                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220                })?;
221            // Drop permits if too many were allocated.
222            while permits.len() > locked_executions.len() {
223                permits.pop();
224            }
225            assert_eq!(permits.len(), locked_executions.len());
226            locked_executions.into_iter().zip(permits)
227        };
228        let execution_deadline = executed_at + self.config.lock_expiry;
229
230        let mut executions = Vec::with_capacity(locked_executions.len());
231        for (locked_execution, permit) in locked_executions {
232            let execution_id = locked_execution.execution_id.clone();
233            let join_handle = {
234                let worker = self.worker.clone();
235                let db_pool = self.db_pool.clone();
236                let clock_fn = self.clock_fn.clone();
237                let run_id = locked_execution.run_id;
238                let worker_span = info_span!(parent: None, "worker",
239                    "otel.name" = format!("worker {}", locked_execution.ffqn),
240                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241                locked_execution.metadata.enrich(&worker_span);
242                tokio::spawn({
243                    let worker_span2 = worker_span.clone();
244                    async move {
245                        let res = Self::run_worker(
246                            worker,
247                            &db_pool,
248                            execution_deadline,
249                            clock_fn,
250                            locked_execution,
251                            worker_span2,
252                        )
253                        .await;
254                        if let Err(db_error) = res {
255                            error!("Execution will be timed out not writing `{db_error:?}`");
256                        }
257                        drop(permit);
258                    }
259                    .instrument(worker_span)
260                })
261            };
262            executions.push((execution_id, join_handle));
263        }
264        Ok(ExecutionProgress { executions })
265    }
266
267    async fn run_worker(
268        worker: Arc<dyn Worker>,
269        db_pool: &P,
270        execution_deadline: DateTime<Utc>,
271        clock_fn: C,
272        locked_execution: LockedExecution,
273        worker_span: Span,
274    ) -> Result<(), DbError> {
275        debug!("Worker::run starting");
276        trace!(
277            version = %locked_execution.version,
278            params = ?locked_execution.params,
279            event_history = ?locked_execution.event_history,
280            "Worker::run starting"
281        );
282        let can_be_retried = ExecutionLog::can_be_retried_after(
283            locked_execution.temporary_event_count + 1,
284            locked_execution.max_retries,
285            locked_execution.retry_exp_backoff,
286        );
287        let unlock_expiry_on_limit_reached =
288            ExecutionLog::compute_retry_duration_when_retrying_forever(
289                locked_execution.temporary_event_count + 1,
290                locked_execution.retry_exp_backoff,
291            );
292        let ctx = WorkerContext {
293            execution_id: locked_execution.execution_id.clone(),
294            metadata: locked_execution.metadata,
295            ffqn: locked_execution.ffqn,
296            params: locked_execution.params,
297            event_history: locked_execution.event_history,
298            responses: locked_execution
299                .responses
300                .into_iter()
301                .map(|outer| outer.event)
302                .collect(),
303            version: locked_execution.version,
304            execution_deadline,
305            can_be_retried: can_be_retried.is_some(),
306            run_id: locked_execution.run_id,
307            worker_span,
308        };
309        let worker_result = worker.run(ctx).await;
310        trace!(?worker_result, "Worker::run finished");
311        let result_obtained_at = clock_fn.now();
312        match Self::worker_result_to_execution_event(
313            locked_execution.execution_id,
314            worker_result,
315            result_obtained_at,
316            locked_execution.parent,
317            can_be_retried,
318            unlock_expiry_on_limit_reached,
319        )? {
320            Some(append) => {
321                let db_connection = db_pool.connection();
322                trace!("Appending {append:?}");
323                append.clone().append(&db_connection).await
324            }
325            None => Ok(()),
326        }
327    }
328
329    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
330    /// Map the `WorkerError` to an temporary or a permanent failure.
331    #[expect(clippy::too_many_lines)]
332    fn worker_result_to_execution_event(
333        execution_id: ExecutionId,
334        worker_result: WorkerResult,
335        result_obtained_at: DateTime<Utc>,
336        parent: Option<(ExecutionId, JoinSetId)>,
337        can_be_retried: Option<Duration>,
338        unlock_expiry_on_limit_reached: Duration,
339    ) -> Result<Option<Append>, DbError> {
340        Ok(match worker_result {
341            WorkerResult::Ok(result, new_version, http_client_traces) => {
342                info!(
343                    "Execution finished: {}",
344                    result.as_pending_state_finished_result()
345                );
346                let child_finished =
347                    parent.map(
348                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349                            parent_execution_id,
350                            parent_join_set,
351                            result: Ok(result.clone()),
352                        },
353                    );
354                let primary_event = AppendRequest {
355                    created_at: result_obtained_at,
356                    event: ExecutionEventInner::Finished {
357                        result: Ok(result),
358                        http_client_traces,
359                    },
360                };
361
362                Some(Append {
363                    created_at: result_obtained_at,
364                    primary_event,
365                    execution_id,
366                    version: new_version,
367                    child_finished,
368                })
369            }
370            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371            WorkerResult::Err(err) => {
372                let reason = err.to_string();
373                let (primary_event, child_finished, version) = match err {
374                    WorkerError::TemporaryTimeout {
375                        http_client_traces,
376                        version,
377                    } => {
378                        if let Some(duration) = can_be_retried {
379                            let backoff_expires_at = result_obtained_at + duration;
380                            info!(
381                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
382                            );
383                            (
384                                ExecutionEventInner::TemporarilyTimedOut {
385                                    backoff_expires_at,
386                                    http_client_traces,
387                                },
388                                None,
389                                version,
390                            )
391                        } else {
392                            info!("Permanent timeout");
393                            let result = Err(FinishedExecutionError::PermanentTimeout);
394                            let child_finished =
395                                parent.map(|(parent_execution_id, parent_join_set)| {
396                                    ChildFinishedResponse {
397                                        parent_execution_id,
398                                        parent_join_set,
399                                        result: result.clone(),
400                                    }
401                                });
402                            (
403                                ExecutionEventInner::Finished {
404                                    result,
405                                    http_client_traces,
406                                },
407                                child_finished,
408                                version,
409                            )
410                        }
411                    }
412                    WorkerError::DbError(db_error) => {
413                        return Err(db_error);
414                    }
415                    WorkerError::ActivityTrap {
416                        reason: reason_inner,
417                        trap_kind,
418                        detail,
419                        version,
420                        http_client_traces,
421                    } => {
422                        if let Some(duration) = can_be_retried {
423                            let expires_at = result_obtained_at + duration;
424                            debug!(
425                                "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
426                            );
427                            (
428                                ExecutionEventInner::TemporarilyFailed {
429                                    backoff_expires_at: expires_at,
430                                    reason_full: StrVariant::from(reason),
431                                    reason_inner: StrVariant::from(reason_inner),
432                                    detail: Some(detail),
433                                    http_client_traces,
434                                },
435                                None,
436                                version,
437                            )
438                        } else {
439                            info!(
440                                "Activity {trap_kind} marked as permanent failure - {reason_inner}"
441                            );
442                            let result = Err(FinishedExecutionError::PermanentFailure {
443                                reason_inner,
444                                reason_full: reason,
445                                kind: PermanentFailureKind::ActivityTrap,
446                                detail: Some(detail),
447                            });
448                            let child_finished =
449                                parent.map(|(parent_execution_id, parent_join_set)| {
450                                    ChildFinishedResponse {
451                                        parent_execution_id,
452                                        parent_join_set,
453                                        result: result.clone(),
454                                    }
455                                });
456                            (
457                                ExecutionEventInner::Finished {
458                                    result,
459                                    http_client_traces,
460                                },
461                                child_finished,
462                                version,
463                            )
464                        }
465                    }
466                    WorkerError::ActivityReturnedError {
467                        detail,
468                        version,
469                        http_client_traces,
470                    } => {
471                        let duration = can_be_retried.expect(
472                            "ActivityReturnedError must not be returned when retries are exhausted",
473                        );
474                        let expires_at = result_obtained_at + duration;
475                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
476                        (
477                            ExecutionEventInner::TemporarilyFailed {
478                                backoff_expires_at: expires_at,
479                                reason_full: StrVariant::Static("activity returned error"),
480                                reason_inner: StrVariant::Static("activity returned error"),
481                                detail,
482                                http_client_traces,
483                            },
484                            None,
485                            version,
486                        )
487                    }
488                    WorkerError::TemporaryWorkflowTrap {
489                        reason: reason_inner,
490                        kind,
491                        detail,
492                        version,
493                    } => {
494                        let duration = can_be_retried.expect("workflows are retried forever");
495                        let expires_at = result_obtained_at + duration;
496                        debug!(
497                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
498                        );
499                        (
500                            ExecutionEventInner::TemporarilyFailed {
501                                backoff_expires_at: expires_at,
502                                reason_full: StrVariant::from(reason),
503                                reason_inner: StrVariant::from(reason_inner),
504                                detail,
505                                http_client_traces: None,
506                            },
507                            None,
508                            version,
509                        )
510                    }
511                    WorkerError::LimitReached {
512                        reason: inner_reason,
513                        version: new_version,
514                    } => {
515                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
516                        warn!(
517                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
518                        );
519                        (
520                            ExecutionEventInner::Unlocked {
521                                backoff_expires_at: expires_at,
522                                reason: StrVariant::from(reason),
523                            },
524                            None,
525                            new_version,
526                        )
527                    }
528                    WorkerError::FatalError(fatal_error, version) => {
529                        info!("Fatal worker error - {fatal_error:?}");
530                        let result = Err(FinishedExecutionError::from(fatal_error));
531                        let child_finished =
532                            parent.map(|(parent_execution_id, parent_join_set)| {
533                                ChildFinishedResponse {
534                                    parent_execution_id,
535                                    parent_join_set,
536                                    result: result.clone(),
537                                }
538                            });
539                        (
540                            ExecutionEventInner::Finished {
541                                result,
542                                http_client_traces: None,
543                            },
544                            child_finished,
545                            version,
546                        )
547                    }
548                };
549                Some(Append {
550                    created_at: result_obtained_at,
551                    primary_event: AppendRequest {
552                        created_at: result_obtained_at,
553                        event: primary_event,
554                    },
555                    execution_id,
556                    version,
557                    child_finished,
558                })
559            }
560        })
561    }
562}
563
564#[derive(Debug, Clone)]
565pub(crate) struct ChildFinishedResponse {
566    pub(crate) parent_execution_id: ExecutionId,
567    pub(crate) parent_join_set: JoinSetId,
568    pub(crate) result: FinishedExecutionResult,
569}
570
571#[derive(Debug, Clone)]
572pub(crate) struct Append {
573    pub(crate) created_at: DateTime<Utc>,
574    pub(crate) primary_event: AppendRequest,
575    pub(crate) execution_id: ExecutionId,
576    pub(crate) version: Version,
577    pub(crate) child_finished: Option<ChildFinishedResponse>,
578}
579
580impl Append {
581    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
582        if let Some(child_finished) = self.child_finished {
583            assert_matches!(
584                &self.primary_event,
585                AppendRequest {
586                    event: ExecutionEventInner::Finished { .. },
587                    ..
588                }
589            );
590            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
591            db_connection
592                .append_batch_respond_to_parent(
593                    derived.clone(),
594                    self.created_at,
595                    vec![self.primary_event],
596                    self.version.clone(),
597                    child_finished.parent_execution_id,
598                    JoinSetResponseEventOuter {
599                        created_at: self.created_at,
600                        event: JoinSetResponseEvent {
601                            join_set_id: child_finished.parent_join_set,
602                            event: JoinSetResponse::ChildExecutionFinished {
603                                child_execution_id: derived,
604                                // Since self.primary_event is a finished event, the version will remain the same.
605                                finished_version: self.version,
606                                result: child_finished.result,
607                            },
608                        },
609                    },
610                )
611                .await?;
612        } else {
613            db_connection
614                .append_batch(
615                    self.created_at,
616                    vec![self.primary_event],
617                    self.execution_id,
618                    self.version,
619                )
620                .await?;
621        }
622        Ok(())
623    }
624}
625
626#[cfg(any(test, feature = "test"))]
627pub mod simple_worker {
628    use crate::worker::{Worker, WorkerContext, WorkerResult};
629    use async_trait::async_trait;
630    use concepts::{
631        FunctionFqn, FunctionMetadata, ParameterTypes,
632        storage::{HistoryEvent, Version},
633    };
634    use indexmap::IndexMap;
635    use std::sync::Arc;
636    use tracing::trace;
637
638    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
639    pub type SimpleWorkerResultMap =
640        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
641
642    #[derive(Clone, Debug)]
643    pub struct SimpleWorker {
644        pub worker_results_rev: SimpleWorkerResultMap,
645        pub ffqn: FunctionFqn,
646        exported: [FunctionMetadata; 1],
647    }
648
649    impl SimpleWorker {
650        #[must_use]
651        pub fn with_single_result(res: WorkerResult) -> Self {
652            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
653                Version::new(2),
654                (vec![], res),
655            )]))))
656        }
657
658        #[must_use]
659        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
660            Self {
661                worker_results_rev: self.worker_results_rev,
662                exported: [FunctionMetadata {
663                    ffqn: ffqn.clone(),
664                    parameter_types: ParameterTypes::default(),
665                    return_type: None,
666                    extension: None,
667                    submittable: true,
668                }],
669                ffqn,
670            }
671        }
672
673        #[must_use]
674        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
675            Self {
676                worker_results_rev,
677                ffqn: FFQN_SOME,
678                exported: [FunctionMetadata {
679                    ffqn: FFQN_SOME,
680                    parameter_types: ParameterTypes::default(),
681                    return_type: None,
682                    extension: None,
683                    submittable: true,
684                }],
685            }
686        }
687    }
688
689    #[async_trait]
690    impl Worker for SimpleWorker {
691        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
692            let (expected_version, (expected_eh, worker_result)) =
693                self.worker_results_rev.lock().unwrap().pop().unwrap();
694            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
695            assert_eq!(expected_version, ctx.version);
696            assert_eq!(expected_eh, ctx.event_history);
697            worker_result
698        }
699
700        fn exported_functions(&self) -> &[FunctionMetadata] {
701            &self.exported
702        }
703    }
704}
705
706#[cfg(test)]
707mod tests {
708    use self::simple_worker::SimpleWorker;
709    use super::*;
710    use crate::worker::FatalError;
711    use crate::{expired_timers_watcher, worker::WorkerResult};
712    use assert_matches::assert_matches;
713    use async_trait::async_trait;
714    use concepts::storage::{CreateRequest, JoinSetRequest};
715    use concepts::storage::{
716        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
717    };
718    use concepts::time::Now;
719    use concepts::{
720        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
721        SupportedFunctionReturnValue, TrapKind,
722    };
723    use db_tests::Database;
724    use indexmap::IndexMap;
725    use simple_worker::FFQN_SOME;
726    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
727    use test_utils::set_up;
728    use test_utils::sim_clock::{ConstClock, SimClock};
729
730    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
731
732    async fn tick_fn<
733        W: Worker + Debug,
734        C: ClockFn + 'static,
735        DB: DbConnection + 'static,
736        P: DbPool<DB> + 'static,
737    >(
738        config: ExecConfig,
739        clock_fn: C,
740        db_pool: P,
741        worker: Arc<W>,
742        executed_at: DateTime<Utc>,
743    ) -> ExecutionProgress {
744        trace!("Ticking with {worker:?}");
745        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
746        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
747        let mut execution_progress = executor.tick(executed_at).await.unwrap();
748        loop {
749            execution_progress
750                .executions
751                .retain(|(_, abort_handle)| !abort_handle.is_finished());
752            if execution_progress.executions.is_empty() {
753                return execution_progress;
754            }
755            tokio::time::sleep(Duration::from_millis(10)).await;
756        }
757    }
758
759    #[tokio::test]
760    async fn execute_simple_lifecycle_tick_based_mem() {
761        let created_at = Now.now();
762        let (_guard, db_pool) = Database::Memory.set_up().await;
763        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
764        db_pool.close().await.unwrap();
765    }
766
767    #[cfg(not(madsim))]
768    #[tokio::test]
769    async fn execute_simple_lifecycle_tick_based_sqlite() {
770        let created_at = Now.now();
771        let (_guard, db_pool) = Database::Sqlite.set_up().await;
772        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
773        db_pool.close().await.unwrap();
774    }
775
776    async fn execute_simple_lifecycle_tick_based<
777        DB: DbConnection + 'static,
778        P: DbPool<DB> + 'static,
779        C: ClockFn + 'static,
780    >(
781        pool: P,
782        clock_fn: C,
783    ) {
784        set_up();
785        let created_at = clock_fn.now();
786        let exec_config = ExecConfig {
787            batch_size: 1,
788            lock_expiry: Duration::from_secs(1),
789            tick_sleep: Duration::from_millis(100),
790            component_id: ComponentId::dummy_activity(),
791            task_limiter: None,
792        };
793
794        let execution_log = create_and_tick(
795            CreateAndTickConfig {
796                execution_id: ExecutionId::generate(),
797                created_at,
798                max_retries: 0,
799                executed_at: created_at,
800                retry_exp_backoff: Duration::ZERO,
801            },
802            clock_fn,
803            pool,
804            exec_config,
805            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
806                SupportedFunctionReturnValue::None,
807                Version::new(2),
808                None,
809            ))),
810            tick_fn,
811        )
812        .await;
813        assert_matches!(
814            execution_log.events.get(2).unwrap(),
815            ExecutionEvent {
816                event: ExecutionEventInner::Finished {
817                    result: Ok(SupportedFunctionReturnValue::None),
818                    http_client_traces: None
819                },
820                created_at: _,
821                backtrace_id: None,
822            }
823        );
824    }
825
826    #[tokio::test]
827    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
828        set_up();
829        let created_at = Now.now();
830        let clock_fn = ConstClock(created_at);
831        let (_guard, db_pool) = Database::Memory.set_up().await;
832        let exec_config = ExecConfig {
833            batch_size: 1,
834            lock_expiry: Duration::from_secs(1),
835            tick_sleep: Duration::ZERO,
836            component_id: ComponentId::dummy_activity(),
837            task_limiter: None,
838        };
839
840        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
841            SupportedFunctionReturnValue::None,
842            Version::new(2),
843            None,
844        )));
845        let exec_task = ExecTask::spawn_new(
846            worker.clone(),
847            exec_config.clone(),
848            clock_fn,
849            db_pool.clone(),
850            ExecutorId::generate(),
851        );
852
853        let execution_log = create_and_tick(
854            CreateAndTickConfig {
855                execution_id: ExecutionId::generate(),
856                created_at,
857                max_retries: 0,
858                executed_at: created_at,
859                retry_exp_backoff: Duration::ZERO,
860            },
861            clock_fn,
862            db_pool.clone(),
863            exec_config,
864            worker,
865            |_, _, _, _, _| async {
866                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
867                ExecutionProgress::default()
868            },
869        )
870        .await;
871        exec_task.close().await;
872        db_pool.close().await.unwrap();
873        assert_matches!(
874            execution_log.events.get(2).unwrap(),
875            ExecutionEvent {
876                event: ExecutionEventInner::Finished {
877                    result: Ok(SupportedFunctionReturnValue::None),
878                    http_client_traces: None
879                },
880                created_at: _,
881                backtrace_id: None,
882            }
883        );
884    }
885
886    struct CreateAndTickConfig {
887        execution_id: ExecutionId,
888        created_at: DateTime<Utc>,
889        max_retries: u32,
890        executed_at: DateTime<Utc>,
891        retry_exp_backoff: Duration,
892    }
893
894    async fn create_and_tick<
895        W: Worker,
896        C: ClockFn,
897        DB: DbConnection,
898        P: DbPool<DB>,
899        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
900        F: Future<Output = ExecutionProgress>,
901    >(
902        config: CreateAndTickConfig,
903        clock_fn: C,
904        db_pool: P,
905        exec_config: ExecConfig,
906        worker: Arc<W>,
907        mut tick: T,
908    ) -> ExecutionLog {
909        // Create an execution
910        let db_connection = db_pool.connection();
911        db_connection
912            .create(CreateRequest {
913                created_at: config.created_at,
914                execution_id: config.execution_id.clone(),
915                ffqn: FFQN_SOME,
916                params: Params::empty(),
917                parent: None,
918                metadata: concepts::ExecutionMetadata::empty(),
919                scheduled_at: config.created_at,
920                retry_exp_backoff: config.retry_exp_backoff,
921                max_retries: config.max_retries,
922                component_id: ComponentId::dummy_activity(),
923                scheduled_by: None,
924            })
925            .await
926            .unwrap();
927        // execute!
928        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
929        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
930        debug!("Execution history after tick: {execution_log:?}");
931        // check that DB contains Created and Locked events.
932        assert_matches!(
933            execution_log.events.first().unwrap(),
934            ExecutionEvent {
935                event: ExecutionEventInner::Created { .. },
936                created_at: actually_created_at,
937                backtrace_id: None,
938            }
939            if config.created_at == *actually_created_at
940        );
941        let locked_at = assert_matches!(
942            execution_log.events.get(1).unwrap(),
943            ExecutionEvent {
944                event: ExecutionEventInner::Locked { .. },
945                created_at: locked_at,
946                backtrace_id: None,
947            } if config.created_at <= *locked_at
948            => *locked_at
949        );
950        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
951            event: _,
952            created_at: executed_at,
953            backtrace_id: None,
954        } if *executed_at >= locked_at);
955        execution_log
956    }
957
958    #[expect(clippy::too_many_lines)]
959    #[tokio::test]
960    async fn activity_trap_should_trigger_an_execution_retry() {
961        set_up();
962        let sim_clock = SimClock::default();
963        let (_guard, db_pool) = Database::Memory.set_up().await;
964        let exec_config = ExecConfig {
965            batch_size: 1,
966            lock_expiry: Duration::from_secs(1),
967            tick_sleep: Duration::ZERO,
968            component_id: ComponentId::dummy_activity(),
969            task_limiter: None,
970        };
971        let expected_reason = "error reason";
972        let expected_detail = "error detail";
973        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
974            WorkerError::ActivityTrap {
975                reason: expected_reason.to_string(),
976                trap_kind: concepts::TrapKind::Trap,
977                detail: expected_detail.to_string(),
978                version: Version::new(2),
979                http_client_traces: None,
980            },
981        )));
982        let retry_exp_backoff = Duration::from_millis(100);
983        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
984        let execution_log = create_and_tick(
985            CreateAndTickConfig {
986                execution_id: ExecutionId::generate(),
987                created_at: sim_clock.now(),
988                max_retries: 1,
989                executed_at: sim_clock.now(),
990                retry_exp_backoff,
991            },
992            sim_clock.clone(),
993            db_pool.clone(),
994            exec_config.clone(),
995            worker,
996            tick_fn,
997        )
998        .await;
999        assert_eq!(3, execution_log.events.len());
1000        {
1001            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1002                &execution_log.events.get(2).unwrap(),
1003                ExecutionEvent {
1004                    event: ExecutionEventInner::TemporarilyFailed {
1005                        reason_inner,
1006                        reason_full,
1007                        detail,
1008                        backoff_expires_at,
1009                        http_client_traces: None,
1010                    },
1011                    created_at: at,
1012                    backtrace_id: None,
1013                }
1014                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1015            );
1016            assert_eq!(expected_reason, reason_inner.deref());
1017            assert_eq!(
1018                format!("activity trap: {expected_reason}"),
1019                reason_full.deref()
1020            );
1021            assert_eq!(Some(expected_detail), detail.as_deref());
1022            assert_eq!(at, sim_clock.now());
1023            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1024        }
1025        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1026            std::sync::Mutex::new(IndexMap::from([(
1027                Version::new(4),
1028                (
1029                    vec![],
1030                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1031                ),
1032            )])),
1033        )));
1034        // noop until `retry_exp_backoff` expires
1035        assert!(
1036            tick_fn(
1037                exec_config.clone(),
1038                sim_clock.clone(),
1039                db_pool.clone(),
1040                worker.clone(),
1041                sim_clock.now(),
1042            )
1043            .await
1044            .executions
1045            .is_empty()
1046        );
1047        // tick again to finish the execution
1048        sim_clock.move_time_forward(retry_exp_backoff).await;
1049        tick_fn(
1050            exec_config,
1051            sim_clock.clone(),
1052            db_pool.clone(),
1053            worker,
1054            sim_clock.now(),
1055        )
1056        .await;
1057        let execution_log = {
1058            let db_connection = db_pool.connection();
1059            db_connection
1060                .get(&execution_log.execution_id)
1061                .await
1062                .unwrap()
1063        };
1064        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1065        assert_matches!(
1066            execution_log.events.get(3).unwrap(),
1067            ExecutionEvent {
1068                event: ExecutionEventInner::Locked { .. },
1069                created_at: at,
1070                backtrace_id: None,
1071            } if *at == sim_clock.now()
1072        );
1073        assert_matches!(
1074            execution_log.events.get(4).unwrap(),
1075            ExecutionEvent {
1076                event: ExecutionEventInner::Finished {
1077                    result: Ok(SupportedFunctionReturnValue::None),
1078                    http_client_traces: None
1079                },
1080                created_at: finished_at,
1081                backtrace_id: None,
1082            } if *finished_at == sim_clock.now()
1083        );
1084        db_pool.close().await.unwrap();
1085    }
1086
1087    #[tokio::test]
1088    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1089        set_up();
1090        let created_at = Now.now();
1091        let clock_fn = ConstClock(created_at);
1092        let (_guard, db_pool) = Database::Memory.set_up().await;
1093        let exec_config = ExecConfig {
1094            batch_size: 1,
1095            lock_expiry: Duration::from_secs(1),
1096            tick_sleep: Duration::ZERO,
1097            component_id: ComponentId::dummy_activity(),
1098            task_limiter: None,
1099        };
1100
1101        let expected_reason = "error reason";
1102        let expected_detail = "error detail";
1103        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1104            WorkerError::ActivityTrap {
1105                reason: expected_reason.to_string(),
1106                trap_kind: concepts::TrapKind::Trap,
1107                detail: expected_detail.to_string(),
1108                version: Version::new(2),
1109                http_client_traces: None,
1110            },
1111        )));
1112        let execution_log = create_and_tick(
1113            CreateAndTickConfig {
1114                execution_id: ExecutionId::generate(),
1115                created_at,
1116                max_retries: 0,
1117                executed_at: created_at,
1118                retry_exp_backoff: Duration::ZERO,
1119            },
1120            clock_fn,
1121            db_pool.clone(),
1122            exec_config.clone(),
1123            worker,
1124            tick_fn,
1125        )
1126        .await;
1127        assert_eq!(3, execution_log.events.len());
1128        let (reason, kind, detail) = assert_matches!(
1129            &execution_log.events.get(2).unwrap(),
1130            ExecutionEvent {
1131                event: ExecutionEventInner::Finished{
1132                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1133                    http_client_traces: None
1134                },
1135                created_at: at,
1136                backtrace_id: None,
1137            } if *at == created_at
1138            => (reason_inner, kind, detail)
1139        );
1140        assert_eq!(expected_reason, *reason);
1141        assert_eq!(Some(expected_detail), detail.as_deref());
1142        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1143
1144        db_pool.close().await.unwrap();
1145    }
1146
1147    #[tokio::test]
1148    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1149        let worker_error = WorkerError::ActivityTrap {
1150            reason: "error reason".to_string(),
1151            trap_kind: TrapKind::Trap,
1152            detail: "detail".to_string(),
1153            version: Version::new(2),
1154            http_client_traces: None,
1155        };
1156        let expected_child_err = FinishedExecutionError::PermanentFailure {
1157            reason_full: "activity trap: error reason".to_string(),
1158            reason_inner: "error reason".to_string(),
1159            kind: PermanentFailureKind::ActivityTrap,
1160            detail: Some("detail".to_string()),
1161        };
1162        child_execution_permanently_failed_should_notify_parent(
1163            WorkerResult::Err(worker_error),
1164            expected_child_err,
1165        )
1166        .await;
1167    }
1168
1169    // TODO: Add test for WorkerError::TemporaryTimeout
1170
1171    #[tokio::test]
1172    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1173        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1174        child_execution_permanently_failed_should_notify_parent(
1175            WorkerResult::DbUpdatedByWorkerOrWatcher,
1176            expected_child_err,
1177        )
1178        .await;
1179    }
1180
1181    #[tokio::test]
1182    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1183        let parent_id = ExecutionId::from_parts(1, 1);
1184        let join_set_id_outer =
1185            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1186        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1187        let join_set_id_inner =
1188            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1189        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1190        let worker_error = WorkerError::FatalError(
1191            FatalError::UnhandledChildExecutionError {
1192                child_execution_id: child_execution_id.clone(),
1193                root_cause_id: root_cause_id.clone(),
1194            },
1195            Version::new(2),
1196        );
1197        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1198            child_execution_id,
1199            root_cause_id,
1200        };
1201        child_execution_permanently_failed_should_notify_parent(
1202            WorkerResult::Err(worker_error),
1203            expected_child_err,
1204        )
1205        .await;
1206    }
1207
1208    #[expect(clippy::too_many_lines)]
1209    async fn child_execution_permanently_failed_should_notify_parent(
1210        worker_result: WorkerResult,
1211        expected_child_err: FinishedExecutionError,
1212    ) {
1213        use concepts::storage::JoinSetResponseEventOuter;
1214        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1215
1216        set_up();
1217        let sim_clock = SimClock::default();
1218        let (_guard, db_pool) = Database::Memory.set_up().await;
1219
1220        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1221            WorkerResult::DbUpdatedByWorkerOrWatcher,
1222        ));
1223        let parent_execution_id = ExecutionId::generate();
1224        db_pool
1225            .connection()
1226            .create(CreateRequest {
1227                created_at: sim_clock.now(),
1228                execution_id: parent_execution_id.clone(),
1229                ffqn: FFQN_SOME,
1230                params: Params::empty(),
1231                parent: None,
1232                metadata: concepts::ExecutionMetadata::empty(),
1233                scheduled_at: sim_clock.now(),
1234                retry_exp_backoff: Duration::ZERO,
1235                max_retries: 0,
1236                component_id: ComponentId::dummy_activity(),
1237                scheduled_by: None,
1238            })
1239            .await
1240            .unwrap();
1241        tick_fn(
1242            ExecConfig {
1243                batch_size: 1,
1244                lock_expiry: LOCK_EXPIRY,
1245                tick_sleep: Duration::ZERO,
1246                component_id: ComponentId::dummy_activity(),
1247                task_limiter: None,
1248            },
1249            sim_clock.clone(),
1250            db_pool.clone(),
1251            parent_worker,
1252            sim_clock.now(),
1253        )
1254        .await;
1255
1256        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1257        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1258        // executor does not append anything, this should have been written by the worker:
1259        {
1260            let child = CreateRequest {
1261                created_at: sim_clock.now(),
1262                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1263                ffqn: FFQN_CHILD,
1264                params: Params::empty(),
1265                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1266                metadata: concepts::ExecutionMetadata::empty(),
1267                scheduled_at: sim_clock.now(),
1268                retry_exp_backoff: Duration::ZERO,
1269                max_retries: 0,
1270                component_id: ComponentId::dummy_activity(),
1271                scheduled_by: None,
1272            };
1273            let current_time = sim_clock.now();
1274            let join_set = AppendRequest {
1275                created_at: current_time,
1276                event: ExecutionEventInner::HistoryEvent {
1277                    event: HistoryEvent::JoinSetCreate {
1278                        join_set_id: join_set_id.clone(),
1279                        closing_strategy: ClosingStrategy::Complete,
1280                    },
1281                },
1282            };
1283            let child_exec_req = AppendRequest {
1284                created_at: current_time,
1285                event: ExecutionEventInner::HistoryEvent {
1286                    event: HistoryEvent::JoinSetRequest {
1287                        join_set_id: join_set_id.clone(),
1288                        request: JoinSetRequest::ChildExecutionRequest {
1289                            child_execution_id: child_execution_id.clone(),
1290                        },
1291                    },
1292                },
1293            };
1294            let join_next = AppendRequest {
1295                created_at: current_time,
1296                event: ExecutionEventInner::HistoryEvent {
1297                    event: HistoryEvent::JoinNext {
1298                        join_set_id: join_set_id.clone(),
1299                        run_expires_at: sim_clock.now(),
1300                        closing: false,
1301                    },
1302                },
1303            };
1304            db_pool
1305                .connection()
1306                .append_batch_create_new_execution(
1307                    current_time,
1308                    vec![join_set, child_exec_req, join_next],
1309                    parent_execution_id.clone(),
1310                    Version::new(2),
1311                    vec![child],
1312                )
1313                .await
1314                .unwrap();
1315        }
1316
1317        let child_worker =
1318            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1319
1320        // execute the child
1321        tick_fn(
1322            ExecConfig {
1323                batch_size: 1,
1324                lock_expiry: LOCK_EXPIRY,
1325                tick_sleep: Duration::ZERO,
1326                component_id: ComponentId::dummy_activity(),
1327                task_limiter: None,
1328            },
1329            sim_clock.clone(),
1330            db_pool.clone(),
1331            child_worker,
1332            sim_clock.now(),
1333        )
1334        .await;
1335        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1336            // In case of timeout, let the timers watcher handle it
1337            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1338            expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1339                .await
1340                .unwrap();
1341        }
1342        let child_log = db_pool
1343            .connection()
1344            .get(&ExecutionId::Derived(child_execution_id.clone()))
1345            .await
1346            .unwrap();
1347        assert!(child_log.pending_state.is_finished());
1348        assert_eq!(
1349            Version(2),
1350            child_log.next_version,
1351            "created = 0, locked = 1, with_single_result = 2"
1352        );
1353        assert_eq!(
1354            ExecutionEventInner::Finished {
1355                result: Err(expected_child_err),
1356                http_client_traces: None
1357            },
1358            child_log.last_event().event
1359        );
1360        let parent_log = db_pool
1361            .connection()
1362            .get(&parent_execution_id)
1363            .await
1364            .unwrap();
1365        assert_matches!(
1366            parent_log.pending_state,
1367            PendingState::PendingAt {
1368                scheduled_at
1369            } if scheduled_at == sim_clock.now(),
1370            "parent should be back to pending"
1371        );
1372        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1373            parent_log.responses.last(),
1374            Some(JoinSetResponseEventOuter{
1375                created_at: at,
1376                event: JoinSetResponseEvent{
1377                    join_set_id: found_join_set_id,
1378                    event: JoinSetResponse::ChildExecutionFinished {
1379                        child_execution_id: found_child_execution_id,
1380                        finished_version,
1381                        result: found_result,
1382                    }
1383                }
1384            })
1385             if *at == sim_clock.now()
1386            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1387        );
1388        assert_eq!(join_set_id, *found_join_set_id);
1389        assert_eq!(child_execution_id, *found_child_execution_id);
1390        assert_eq!(child_log.next_version, *child_finished_version);
1391        assert!(found_result.is_err());
1392
1393        db_pool.close().await.unwrap();
1394    }
1395
1396    #[derive(Clone, Debug)]
1397    struct SleepyWorker {
1398        duration: Duration,
1399        result: SupportedFunctionReturnValue,
1400        exported: [FunctionMetadata; 1],
1401    }
1402
1403    #[async_trait]
1404    impl Worker for SleepyWorker {
1405        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1406            tokio::time::sleep(self.duration).await;
1407            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1408        }
1409
1410        fn exported_functions(&self) -> &[FunctionMetadata] {
1411            &self.exported
1412        }
1413    }
1414
1415    #[expect(clippy::too_many_lines)]
1416    #[tokio::test]
1417    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1418        set_up();
1419        let sim_clock = SimClock::default();
1420        let (_guard, db_pool) = Database::Memory.set_up().await;
1421        let lock_expiry = Duration::from_millis(100);
1422        let exec_config = ExecConfig {
1423            batch_size: 1,
1424            lock_expiry,
1425            tick_sleep: Duration::ZERO,
1426            component_id: ComponentId::dummy_activity(),
1427            task_limiter: None,
1428        };
1429
1430        let worker = Arc::new(SleepyWorker {
1431            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1432            result: SupportedFunctionReturnValue::None,
1433            exported: [FunctionMetadata {
1434                ffqn: FFQN_SOME,
1435                parameter_types: ParameterTypes::default(),
1436                return_type: None,
1437                extension: None,
1438                submittable: true,
1439            }],
1440        });
1441        // Create an execution
1442        let execution_id = ExecutionId::generate();
1443        let timeout_duration = Duration::from_millis(300);
1444        let db_connection = db_pool.connection();
1445        db_connection
1446            .create(CreateRequest {
1447                created_at: sim_clock.now(),
1448                execution_id: execution_id.clone(),
1449                ffqn: FFQN_SOME,
1450                params: Params::empty(),
1451                parent: None,
1452                metadata: concepts::ExecutionMetadata::empty(),
1453                scheduled_at: sim_clock.now(),
1454                retry_exp_backoff: timeout_duration,
1455                max_retries: 1,
1456                component_id: ComponentId::dummy_activity(),
1457                scheduled_by: None,
1458            })
1459            .await
1460            .unwrap();
1461
1462        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1463        let executor = ExecTask::new(
1464            worker,
1465            exec_config,
1466            sim_clock.clone(),
1467            db_pool.clone(),
1468            ffqns,
1469        );
1470        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1471        assert_eq!(1, first_execution_progress.executions.len());
1472        // Started hanging, wait for lock expiry.
1473        sim_clock.move_time_forward(lock_expiry).await;
1474        // cleanup should be called
1475        let now_after_first_lock_expiry = sim_clock.now();
1476        {
1477            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1478            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1479            assert!(cleanup_progress.executions.is_empty());
1480        }
1481        {
1482            let expired_locks =
1483                expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1484                    .await
1485                    .unwrap()
1486                    .expired_locks;
1487            assert_eq!(1, expired_locks);
1488        }
1489        assert!(
1490            !first_execution_progress
1491                .executions
1492                .pop()
1493                .unwrap()
1494                .1
1495                .is_finished()
1496        );
1497
1498        let execution_log = db_connection.get(&execution_id).await.unwrap();
1499        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1500        assert_matches!(
1501            &execution_log.events.get(2).unwrap(),
1502            ExecutionEvent {
1503                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1504                created_at: at,
1505                backtrace_id: None,
1506            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1507        );
1508        assert_eq!(
1509            PendingState::PendingAt {
1510                scheduled_at: expected_first_timeout_expiry
1511            },
1512            execution_log.pending_state
1513        );
1514        sim_clock.move_time_forward(timeout_duration).await;
1515        let now_after_first_timeout = sim_clock.now();
1516        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1517
1518        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1519        assert_eq!(1, second_execution_progress.executions.len());
1520
1521        // Started hanging, wait for lock expiry.
1522        sim_clock.move_time_forward(lock_expiry).await;
1523        // cleanup should be called
1524        let now_after_second_lock_expiry = sim_clock.now();
1525        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1526        {
1527            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1528            assert!(cleanup_progress.executions.is_empty());
1529        }
1530        {
1531            let expired_locks =
1532                expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1533                    .await
1534                    .unwrap()
1535                    .expired_locks;
1536            assert_eq!(1, expired_locks);
1537        }
1538        assert!(
1539            !second_execution_progress
1540                .executions
1541                .pop()
1542                .unwrap()
1543                .1
1544                .is_finished()
1545        );
1546
1547        drop(db_connection);
1548        drop(executor);
1549        db_pool.close().await.unwrap();
1550    }
1551}