obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12    FinishedExecutionError,
13    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29    pub lock_expiry: Duration,
30    pub tick_sleep: Duration,
31    pub batch_size: u32,
32    pub component_id: ComponentId,
33    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37    worker: Arc<dyn Worker>,
38    config: ExecConfig,
39    clock_fn: C, // Used for obtaining current time when the execution finishes.
40    db_pool: P,
41    executor_id: ExecutorId,
42    phantom_data: PhantomData<DB>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: P,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            executor_id: ExecutorId::generate(),
123            db_pool,
124            phantom_data: PhantomData,
125            ffqns,
126            clock_fn,
127        }
128    }
129
130    pub fn spawn_new(
131        worker: Arc<dyn Worker>,
132        config: ExecConfig,
133        clock_fn: C,
134        db_pool: P,
135        executor_id: ExecutorId,
136    ) -> ExecutorTaskHandle {
137        let is_closing = Arc::new(AtomicBool::default());
138        let is_closing_inner = is_closing.clone();
139        let ffqns = extract_ffqns(worker.as_ref());
140        let component_id = config.component_id.clone();
141        let abort_handle = tokio::spawn(async move {
142            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143            let task = Self {
144                worker,
145                config,
146                executor_id,
147                db_pool,
148                phantom_data: PhantomData,
149                ffqns: ffqns.clone(),
150                clock_fn: clock_fn.clone(),
151            };
152            loop {
153                let _ = task.tick(clock_fn.now()).await;
154                let executed_at = clock_fn.now();
155                task.db_pool
156                    .connection()
157                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158                    .await;
159                if is_closing_inner.load(Ordering::Relaxed) {
160                    return;
161                }
162            }
163        })
164        .abort_handle();
165        ExecutorTaskHandle {
166            is_closing,
167            abort_handle,
168            component_id,
169            executor_id,
170        }
171    }
172
173    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174        if let Some(task_limiter) = &self.config.task_limiter {
175            let mut locks = Vec::new();
176            for _ in 0..self.config.batch_size {
177                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178                    locks.push(Some(permit));
179                } else {
180                    break;
181                }
182            }
183            locks
184        } else {
185            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186            for _ in 0..self.config.batch_size {
187                vec.push(None);
188            }
189            vec
190        }
191    }
192
193    #[cfg(feature = "test")]
194    pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200        let locked_executions = {
201            let mut permits = self.acquire_task_permits();
202            if permits.is_empty() {
203                return Ok(ExecutionProgress::default());
204            }
205            let db_connection = self.db_pool.connection();
206            let lock_expires_at = executed_at + self.config.lock_expiry;
207            let locked_executions = db_connection
208                .lock_pending(
209                    permits.len(), // batch size
210                    executed_at,   // fetch expiring before now
211                    self.ffqns.clone(),
212                    executed_at, // created at
213                    self.config.component_id.clone(),
214                    self.executor_id,
215                    lock_expires_at,
216                )
217                .await
218                .map_err(|err| {
219                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220                })?;
221            // Drop permits if too many were allocated.
222            while permits.len() > locked_executions.len() {
223                permits.pop();
224            }
225            assert_eq!(permits.len(), locked_executions.len());
226            locked_executions.into_iter().zip(permits)
227        };
228        let execution_deadline = executed_at + self.config.lock_expiry;
229
230        let mut executions = Vec::with_capacity(locked_executions.len());
231        for (locked_execution, permit) in locked_executions {
232            let execution_id = locked_execution.execution_id.clone();
233            let join_handle = {
234                let worker = self.worker.clone();
235                let db_pool = self.db_pool.clone();
236                let clock_fn = self.clock_fn.clone();
237                let run_id = locked_execution.run_id;
238                let worker_span = info_span!(parent: None, "worker",
239                    "otel.name" = format!("worker {}", locked_execution.ffqn),
240                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241                locked_execution.metadata.enrich(&worker_span);
242                tokio::spawn({
243                    let worker_span2 = worker_span.clone();
244                    async move {
245                        let res = Self::run_worker(
246                            worker,
247                            &db_pool,
248                            execution_deadline,
249                            clock_fn,
250                            locked_execution,
251                            worker_span2,
252                        )
253                        .await;
254                        if let Err(db_error) = res {
255                            error!("Execution will be timed out not writing `{db_error:?}`");
256                        }
257                        drop(permit);
258                    }
259                    .instrument(worker_span)
260                })
261            };
262            executions.push((execution_id, join_handle));
263        }
264        Ok(ExecutionProgress { executions })
265    }
266
267    async fn run_worker(
268        worker: Arc<dyn Worker>,
269        db_pool: &P,
270        execution_deadline: DateTime<Utc>,
271        clock_fn: C,
272        locked_execution: LockedExecution,
273        worker_span: Span,
274    ) -> Result<(), DbError> {
275        debug!("Worker::run starting");
276        trace!(
277            version = %locked_execution.version,
278            params = ?locked_execution.params,
279            event_history = ?locked_execution.event_history,
280            "Worker::run starting"
281        );
282        let can_be_retried = ExecutionLog::can_be_retried_after(
283            locked_execution.temporary_event_count + 1,
284            locked_execution.max_retries,
285            locked_execution.retry_exp_backoff,
286        );
287        let unlock_expiry_on_limit_reached =
288            ExecutionLog::compute_retry_duration_when_retrying_forever(
289                locked_execution.temporary_event_count + 1,
290                locked_execution.retry_exp_backoff,
291            );
292        let ctx = WorkerContext {
293            execution_id: locked_execution.execution_id.clone(),
294            metadata: locked_execution.metadata,
295            ffqn: locked_execution.ffqn,
296            params: locked_execution.params,
297            event_history: locked_execution.event_history,
298            responses: locked_execution
299                .responses
300                .into_iter()
301                .map(|outer| outer.event)
302                .collect(),
303            version: locked_execution.version,
304            execution_deadline,
305            can_be_retried: can_be_retried.is_some(),
306            run_id: locked_execution.run_id,
307            worker_span,
308        };
309        let worker_result = worker.run(ctx).await;
310        trace!(?worker_result, "Worker::run finished");
311        let result_obtained_at = clock_fn.now();
312        match Self::worker_result_to_execution_event(
313            locked_execution.execution_id,
314            worker_result,
315            result_obtained_at,
316            locked_execution.parent,
317            can_be_retried,
318            unlock_expiry_on_limit_reached,
319        )? {
320            Some(append) => {
321                let db_connection = db_pool.connection();
322                trace!("Appending {append:?}");
323                append.clone().append(&db_connection).await
324            }
325            None => Ok(()),
326        }
327    }
328
329    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
330    /// Map the `WorkerError` to an temporary or a permanent failure.
331    #[expect(clippy::too_many_lines)]
332    fn worker_result_to_execution_event(
333        execution_id: ExecutionId,
334        worker_result: WorkerResult,
335        result_obtained_at: DateTime<Utc>,
336        parent: Option<(ExecutionId, JoinSetId)>,
337        can_be_retried: Option<Duration>,
338        unlock_expiry_on_limit_reached: Duration,
339    ) -> Result<Option<Append>, DbError> {
340        Ok(match worker_result {
341            WorkerResult::Ok(result, new_version, http_client_traces) => {
342                info!(
343                    "Execution finished: {}",
344                    result.as_pending_state_finished_result()
345                );
346                let child_finished =
347                    parent.map(
348                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349                            parent_execution_id,
350                            parent_join_set,
351                            result: Ok(result.clone()),
352                        },
353                    );
354                let primary_event = AppendRequest {
355                    created_at: result_obtained_at,
356                    event: ExecutionEventInner::Finished {
357                        result: Ok(result),
358                        http_client_traces,
359                    },
360                };
361
362                Some(Append {
363                    created_at: result_obtained_at,
364                    primary_event,
365                    execution_id,
366                    version: new_version,
367                    child_finished,
368                })
369            }
370            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371            WorkerResult::Err(err) => {
372                let reason = err.to_string();
373                let (primary_event, child_finished, version) = match err {
374                    WorkerError::TemporaryTimeout {
375                        http_client_traces,
376                        version,
377                    } => {
378                        if let Some(duration) = can_be_retried {
379                            let backoff_expires_at = result_obtained_at + duration;
380                            info!("Temporary timeout after {duration:?} at {backoff_expires_at}");
381                            (
382                                ExecutionEventInner::TemporarilyTimedOut {
383                                    backoff_expires_at,
384                                    http_client_traces,
385                                },
386                                None,
387                                version,
388                            )
389                        } else {
390                            info!("Permanent timeout");
391                            let result = Err(FinishedExecutionError::PermanentTimeout);
392                            let child_finished =
393                                parent.map(|(parent_execution_id, parent_join_set)| {
394                                    ChildFinishedResponse {
395                                        parent_execution_id,
396                                        parent_join_set,
397                                        result: result.clone(),
398                                    }
399                                });
400                            (
401                                ExecutionEventInner::Finished {
402                                    result,
403                                    http_client_traces,
404                                },
405                                child_finished,
406                                version,
407                            )
408                        }
409                    }
410                    WorkerError::DbError(db_error) => {
411                        return Err(db_error);
412                    }
413                    WorkerError::ActivityTrap {
414                        reason: reason_inner,
415                        trap_kind,
416                        detail,
417                        version,
418                        http_client_traces,
419                    } => {
420                        if let Some(duration) = can_be_retried {
421                            let expires_at = result_obtained_at + duration;
422                            debug!(
423                                "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
424                            );
425                            (
426                                ExecutionEventInner::TemporarilyFailed {
427                                    backoff_expires_at: expires_at,
428                                    reason_full: StrVariant::from(reason),
429                                    reason_inner: StrVariant::from(reason_inner),
430                                    detail: Some(detail),
431                                    http_client_traces,
432                                },
433                                None,
434                                version,
435                            )
436                        } else {
437                            info!(
438                                "Activity {trap_kind} marked as permanent failure - {reason_inner}"
439                            );
440                            let result = Err(FinishedExecutionError::PermanentFailure {
441                                reason_inner,
442                                reason_full: reason,
443                                kind: PermanentFailureKind::ActivityTrap,
444                                detail: Some(detail),
445                            });
446                            let child_finished =
447                                parent.map(|(parent_execution_id, parent_join_set)| {
448                                    ChildFinishedResponse {
449                                        parent_execution_id,
450                                        parent_join_set,
451                                        result: result.clone(),
452                                    }
453                                });
454                            (
455                                ExecutionEventInner::Finished {
456                                    result,
457                                    http_client_traces,
458                                },
459                                child_finished,
460                                version,
461                            )
462                        }
463                    }
464                    WorkerError::ActivityReturnedError {
465                        detail,
466                        version,
467                        http_client_traces,
468                    } => {
469                        let duration = can_be_retried.expect(
470                            "ActivityReturnedError must not be returned when retries are exhausted",
471                        );
472                        let expires_at = result_obtained_at + duration;
473                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
474                        (
475                            ExecutionEventInner::TemporarilyFailed {
476                                backoff_expires_at: expires_at,
477                                reason_full: StrVariant::Static("activity returned error"),
478                                reason_inner: StrVariant::Static("activity returned error"),
479                                detail,
480                                http_client_traces,
481                            },
482                            None,
483                            version,
484                        )
485                    }
486                    WorkerError::TemporaryWorkflowTrap {
487                        reason: reason_inner,
488                        kind,
489                        detail,
490                        version,
491                    } => {
492                        let duration = can_be_retried.expect("workflows are retried forever");
493                        let expires_at = result_obtained_at + duration;
494                        debug!(
495                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
496                        );
497                        (
498                            ExecutionEventInner::TemporarilyFailed {
499                                backoff_expires_at: expires_at,
500                                reason_full: StrVariant::from(reason),
501                                reason_inner: StrVariant::from(reason_inner),
502                                detail,
503                                http_client_traces: None,
504                            },
505                            None,
506                            version,
507                        )
508                    }
509                    WorkerError::LimitReached {
510                        reason: inner_reason,
511                        version: new_version,
512                    } => {
513                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
514                        warn!(
515                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
516                        );
517                        (
518                            ExecutionEventInner::Unlocked {
519                                backoff_expires_at: expires_at,
520                                reason: StrVariant::from(reason),
521                            },
522                            None,
523                            new_version,
524                        )
525                    }
526                    WorkerError::FatalError(fatal_error, version) => {
527                        info!("Fatal worker error - {fatal_error:?}");
528                        let result = Err(FinishedExecutionError::from(fatal_error));
529                        let child_finished =
530                            parent.map(|(parent_execution_id, parent_join_set)| {
531                                ChildFinishedResponse {
532                                    parent_execution_id,
533                                    parent_join_set,
534                                    result: result.clone(),
535                                }
536                            });
537                        (
538                            ExecutionEventInner::Finished {
539                                result,
540                                http_client_traces: None,
541                            },
542                            child_finished,
543                            version,
544                        )
545                    }
546                };
547                Some(Append {
548                    created_at: result_obtained_at,
549                    primary_event: AppendRequest {
550                        created_at: result_obtained_at,
551                        event: primary_event,
552                    },
553                    execution_id,
554                    version,
555                    child_finished,
556                })
557            }
558        })
559    }
560}
561
562#[derive(Debug, Clone)]
563pub(crate) struct ChildFinishedResponse {
564    pub(crate) parent_execution_id: ExecutionId,
565    pub(crate) parent_join_set: JoinSetId,
566    pub(crate) result: FinishedExecutionResult,
567}
568
569#[derive(Debug, Clone)]
570pub(crate) struct Append {
571    pub(crate) created_at: DateTime<Utc>,
572    pub(crate) primary_event: AppendRequest,
573    pub(crate) execution_id: ExecutionId,
574    pub(crate) version: Version,
575    pub(crate) child_finished: Option<ChildFinishedResponse>,
576}
577
578impl Append {
579    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
580        if let Some(child_finished) = self.child_finished {
581            assert_matches!(
582                &self.primary_event,
583                AppendRequest {
584                    event: ExecutionEventInner::Finished { .. },
585                    ..
586                }
587            );
588            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
589            db_connection
590                .append_batch_respond_to_parent(
591                    derived.clone(),
592                    self.created_at,
593                    vec![self.primary_event],
594                    self.version.clone(),
595                    child_finished.parent_execution_id,
596                    JoinSetResponseEventOuter {
597                        created_at: self.created_at,
598                        event: JoinSetResponseEvent {
599                            join_set_id: child_finished.parent_join_set,
600                            event: JoinSetResponse::ChildExecutionFinished {
601                                child_execution_id: derived,
602                                // Since self.primary_event is a finished event, the version will remain the same.
603                                finished_version: self.version,
604                                result: child_finished.result,
605                            },
606                        },
607                    },
608                )
609                .await?;
610        } else {
611            db_connection
612                .append_batch(
613                    self.created_at,
614                    vec![self.primary_event],
615                    self.execution_id,
616                    self.version,
617                )
618                .await?;
619        }
620        Ok(())
621    }
622}
623
624#[cfg(any(test, feature = "test"))]
625pub mod simple_worker {
626    use crate::worker::{Worker, WorkerContext, WorkerResult};
627    use async_trait::async_trait;
628    use concepts::{
629        FunctionFqn, FunctionMetadata, ParameterTypes,
630        storage::{HistoryEvent, Version},
631    };
632    use indexmap::IndexMap;
633    use std::sync::Arc;
634    use tracing::trace;
635
636    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
637    pub type SimpleWorkerResultMap =
638        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
639
640    #[derive(Clone, Debug)]
641    pub struct SimpleWorker {
642        pub worker_results_rev: SimpleWorkerResultMap,
643        pub ffqn: FunctionFqn,
644        exported: [FunctionMetadata; 1],
645    }
646
647    impl SimpleWorker {
648        #[must_use]
649        pub fn with_single_result(res: WorkerResult) -> Self {
650            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
651                Version::new(2),
652                (vec![], res),
653            )]))))
654        }
655
656        #[must_use]
657        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
658            Self {
659                worker_results_rev: self.worker_results_rev,
660                exported: [FunctionMetadata {
661                    ffqn: ffqn.clone(),
662                    parameter_types: ParameterTypes::default(),
663                    return_type: None,
664                    extension: None,
665                    submittable: true,
666                }],
667                ffqn,
668            }
669        }
670
671        #[must_use]
672        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
673            Self {
674                worker_results_rev,
675                ffqn: FFQN_SOME,
676                exported: [FunctionMetadata {
677                    ffqn: FFQN_SOME,
678                    parameter_types: ParameterTypes::default(),
679                    return_type: None,
680                    extension: None,
681                    submittable: true,
682                }],
683            }
684        }
685    }
686
687    #[async_trait]
688    impl Worker for SimpleWorker {
689        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
690            let (expected_version, (expected_eh, worker_result)) =
691                self.worker_results_rev.lock().unwrap().pop().unwrap();
692            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
693            assert_eq!(expected_version, ctx.version);
694            assert_eq!(expected_eh, ctx.event_history);
695            worker_result
696        }
697
698        fn exported_functions(&self) -> &[FunctionMetadata] {
699            &self.exported
700        }
701
702        fn imported_functions(&self) -> &[FunctionMetadata] {
703            &[]
704        }
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use self::simple_worker::SimpleWorker;
711    use super::*;
712    use crate::worker::FatalError;
713    use crate::{expired_timers_watcher, worker::WorkerResult};
714    use assert_matches::assert_matches;
715    use async_trait::async_trait;
716    use concepts::storage::{CreateRequest, JoinSetRequest};
717    use concepts::storage::{
718        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
719    };
720    use concepts::time::Now;
721    use concepts::{
722        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
723        SupportedFunctionReturnValue, TrapKind,
724    };
725    use db_tests::Database;
726    use indexmap::IndexMap;
727    use simple_worker::FFQN_SOME;
728    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
729    use test_utils::set_up;
730    use test_utils::sim_clock::{ConstClock, SimClock};
731
732    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
733
734    async fn tick_fn<
735        W: Worker + Debug,
736        C: ClockFn + 'static,
737        DB: DbConnection + 'static,
738        P: DbPool<DB> + 'static,
739    >(
740        config: ExecConfig,
741        clock_fn: C,
742        db_pool: P,
743        worker: Arc<W>,
744        executed_at: DateTime<Utc>,
745    ) -> ExecutionProgress {
746        trace!("Ticking with {worker:?}");
747        let ffqns = super::extract_ffqns(worker.as_ref());
748        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
749        let mut execution_progress = executor.tick(executed_at).await.unwrap();
750        loop {
751            execution_progress
752                .executions
753                .retain(|(_, abort_handle)| !abort_handle.is_finished());
754            if execution_progress.executions.is_empty() {
755                return execution_progress;
756            }
757            tokio::time::sleep(Duration::from_millis(10)).await;
758        }
759    }
760
761    #[tokio::test]
762    async fn execute_simple_lifecycle_tick_based_mem() {
763        let created_at = Now.now();
764        let (_guard, db_pool) = Database::Memory.set_up().await;
765        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
766        db_pool.close().await.unwrap();
767    }
768
769    #[cfg(not(madsim))]
770    #[tokio::test]
771    async fn execute_simple_lifecycle_tick_based_sqlite() {
772        let created_at = Now.now();
773        let (_guard, db_pool) = Database::Sqlite.set_up().await;
774        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
775        db_pool.close().await.unwrap();
776    }
777
778    async fn execute_simple_lifecycle_tick_based<
779        DB: DbConnection + 'static,
780        P: DbPool<DB> + 'static,
781        C: ClockFn + 'static,
782    >(
783        pool: P,
784        clock_fn: C,
785    ) {
786        set_up();
787        let created_at = clock_fn.now();
788        let exec_config = ExecConfig {
789            batch_size: 1,
790            lock_expiry: Duration::from_secs(1),
791            tick_sleep: Duration::from_millis(100),
792            component_id: ComponentId::dummy_activity(),
793            task_limiter: None,
794        };
795
796        let execution_log = create_and_tick(
797            CreateAndTickConfig {
798                execution_id: ExecutionId::generate(),
799                created_at,
800                max_retries: 0,
801                executed_at: created_at,
802                retry_exp_backoff: Duration::ZERO,
803            },
804            clock_fn,
805            pool,
806            exec_config,
807            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
808                SupportedFunctionReturnValue::None,
809                Version::new(2),
810                None,
811            ))),
812            tick_fn,
813        )
814        .await;
815        assert_matches!(
816            execution_log.events.get(2).unwrap(),
817            ExecutionEvent {
818                event: ExecutionEventInner::Finished {
819                    result: Ok(SupportedFunctionReturnValue::None),
820                    http_client_traces: None
821                },
822                created_at: _,
823                backtrace_id: None,
824            }
825        );
826    }
827
828    #[tokio::test]
829    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
830        set_up();
831        let created_at = Now.now();
832        let clock_fn = ConstClock(created_at);
833        let (_guard, db_pool) = Database::Memory.set_up().await;
834        let exec_config = ExecConfig {
835            batch_size: 1,
836            lock_expiry: Duration::from_secs(1),
837            tick_sleep: Duration::ZERO,
838            component_id: ComponentId::dummy_activity(),
839            task_limiter: None,
840        };
841
842        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
843            SupportedFunctionReturnValue::None,
844            Version::new(2),
845            None,
846        )));
847        let exec_task = ExecTask::spawn_new(
848            worker.clone(),
849            exec_config.clone(),
850            clock_fn,
851            db_pool.clone(),
852            ExecutorId::generate(),
853        );
854
855        let execution_log = create_and_tick(
856            CreateAndTickConfig {
857                execution_id: ExecutionId::generate(),
858                created_at,
859                max_retries: 0,
860                executed_at: created_at,
861                retry_exp_backoff: Duration::ZERO,
862            },
863            clock_fn,
864            db_pool.clone(),
865            exec_config,
866            worker,
867            |_, _, _, _, _| async {
868                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
869                ExecutionProgress::default()
870            },
871        )
872        .await;
873        exec_task.close().await;
874        db_pool.close().await.unwrap();
875        assert_matches!(
876            execution_log.events.get(2).unwrap(),
877            ExecutionEvent {
878                event: ExecutionEventInner::Finished {
879                    result: Ok(SupportedFunctionReturnValue::None),
880                    http_client_traces: None
881                },
882                created_at: _,
883                backtrace_id: None,
884            }
885        );
886    }
887
888    struct CreateAndTickConfig {
889        execution_id: ExecutionId,
890        created_at: DateTime<Utc>,
891        max_retries: u32,
892        executed_at: DateTime<Utc>,
893        retry_exp_backoff: Duration,
894    }
895
896    async fn create_and_tick<
897        W: Worker,
898        C: ClockFn,
899        DB: DbConnection,
900        P: DbPool<DB>,
901        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
902        F: Future<Output = ExecutionProgress>,
903    >(
904        config: CreateAndTickConfig,
905        clock_fn: C,
906        db_pool: P,
907        exec_config: ExecConfig,
908        worker: Arc<W>,
909        mut tick: T,
910    ) -> ExecutionLog {
911        // Create an execution
912        let db_connection = db_pool.connection();
913        db_connection
914            .create(CreateRequest {
915                created_at: config.created_at,
916                execution_id: config.execution_id.clone(),
917                ffqn: FFQN_SOME,
918                params: Params::empty(),
919                parent: None,
920                metadata: concepts::ExecutionMetadata::empty(),
921                scheduled_at: config.created_at,
922                retry_exp_backoff: config.retry_exp_backoff,
923                max_retries: config.max_retries,
924                component_id: ComponentId::dummy_activity(),
925                scheduled_by: None,
926            })
927            .await
928            .unwrap();
929        // execute!
930        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
931        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
932        debug!("Execution history after tick: {execution_log:?}");
933        // check that DB contains Created and Locked events.
934        assert_matches!(
935            execution_log.events.first().unwrap(),
936            ExecutionEvent {
937                event: ExecutionEventInner::Created { .. },
938                created_at: actually_created_at,
939                backtrace_id: None,
940            }
941            if config.created_at == *actually_created_at
942        );
943        let locked_at = assert_matches!(
944            execution_log.events.get(1).unwrap(),
945            ExecutionEvent {
946                event: ExecutionEventInner::Locked { .. },
947                created_at: locked_at,
948                backtrace_id: None,
949            } if config.created_at <= *locked_at
950            => *locked_at
951        );
952        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
953            event: _,
954            created_at: executed_at,
955            backtrace_id: None,
956        } if *executed_at >= locked_at);
957        execution_log
958    }
959
960    #[expect(clippy::too_many_lines)]
961    #[tokio::test]
962    async fn activity_trap_should_trigger_an_execution_retry() {
963        set_up();
964        let sim_clock = SimClock::default();
965        let (_guard, db_pool) = Database::Memory.set_up().await;
966        let exec_config = ExecConfig {
967            batch_size: 1,
968            lock_expiry: Duration::from_secs(1),
969            tick_sleep: Duration::ZERO,
970            component_id: ComponentId::dummy_activity(),
971            task_limiter: None,
972        };
973        let expected_reason = "error reason";
974        let expected_detail = "error detail";
975        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
976            WorkerError::ActivityTrap {
977                reason: expected_reason.to_string(),
978                trap_kind: concepts::TrapKind::Trap,
979                detail: expected_detail.to_string(),
980                version: Version::new(2),
981                http_client_traces: None,
982            },
983        )));
984        let retry_exp_backoff = Duration::from_millis(100);
985        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
986        let execution_log = create_and_tick(
987            CreateAndTickConfig {
988                execution_id: ExecutionId::generate(),
989                created_at: sim_clock.now(),
990                max_retries: 1,
991                executed_at: sim_clock.now(),
992                retry_exp_backoff,
993            },
994            sim_clock.clone(),
995            db_pool.clone(),
996            exec_config.clone(),
997            worker,
998            tick_fn,
999        )
1000        .await;
1001        assert_eq!(3, execution_log.events.len());
1002        {
1003            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1004                &execution_log.events.get(2).unwrap(),
1005                ExecutionEvent {
1006                    event: ExecutionEventInner::TemporarilyFailed {
1007                        reason_inner,
1008                        reason_full,
1009                        detail,
1010                        backoff_expires_at,
1011                        http_client_traces: None,
1012                    },
1013                    created_at: at,
1014                    backtrace_id: None,
1015                }
1016                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1017            );
1018            assert_eq!(expected_reason, reason_inner.deref());
1019            assert_eq!(
1020                format!("activity trap: {expected_reason}"),
1021                reason_full.deref()
1022            );
1023            assert_eq!(Some(expected_detail), detail.as_deref());
1024            assert_eq!(at, sim_clock.now());
1025            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1026        }
1027        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1028            std::sync::Mutex::new(IndexMap::from([(
1029                Version::new(4),
1030                (
1031                    vec![],
1032                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1033                ),
1034            )])),
1035        )));
1036        // noop until `retry_exp_backoff` expires
1037        assert!(
1038            tick_fn(
1039                exec_config.clone(),
1040                sim_clock.clone(),
1041                db_pool.clone(),
1042                worker.clone(),
1043                sim_clock.now(),
1044            )
1045            .await
1046            .executions
1047            .is_empty()
1048        );
1049        // tick again to finish the execution
1050        sim_clock.move_time_forward(retry_exp_backoff).await;
1051        tick_fn(
1052            exec_config,
1053            sim_clock.clone(),
1054            db_pool.clone(),
1055            worker,
1056            sim_clock.now(),
1057        )
1058        .await;
1059        let execution_log = {
1060            let db_connection = db_pool.connection();
1061            db_connection
1062                .get(&execution_log.execution_id)
1063                .await
1064                .unwrap()
1065        };
1066        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1067        assert_matches!(
1068            execution_log.events.get(3).unwrap(),
1069            ExecutionEvent {
1070                event: ExecutionEventInner::Locked { .. },
1071                created_at: at,
1072                backtrace_id: None,
1073            } if *at == sim_clock.now()
1074        );
1075        assert_matches!(
1076            execution_log.events.get(4).unwrap(),
1077            ExecutionEvent {
1078                event: ExecutionEventInner::Finished {
1079                    result: Ok(SupportedFunctionReturnValue::None),
1080                    http_client_traces: None
1081                },
1082                created_at: finished_at,
1083                backtrace_id: None,
1084            } if *finished_at == sim_clock.now()
1085        );
1086        db_pool.close().await.unwrap();
1087    }
1088
1089    #[tokio::test]
1090    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1091        set_up();
1092        let created_at = Now.now();
1093        let clock_fn = ConstClock(created_at);
1094        let (_guard, db_pool) = Database::Memory.set_up().await;
1095        let exec_config = ExecConfig {
1096            batch_size: 1,
1097            lock_expiry: Duration::from_secs(1),
1098            tick_sleep: Duration::ZERO,
1099            component_id: ComponentId::dummy_activity(),
1100            task_limiter: None,
1101        };
1102
1103        let expected_reason = "error reason";
1104        let expected_detail = "error detail";
1105        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1106            WorkerError::ActivityTrap {
1107                reason: expected_reason.to_string(),
1108                trap_kind: concepts::TrapKind::Trap,
1109                detail: expected_detail.to_string(),
1110                version: Version::new(2),
1111                http_client_traces: None,
1112            },
1113        )));
1114        let execution_log = create_and_tick(
1115            CreateAndTickConfig {
1116                execution_id: ExecutionId::generate(),
1117                created_at,
1118                max_retries: 0,
1119                executed_at: created_at,
1120                retry_exp_backoff: Duration::ZERO,
1121            },
1122            clock_fn,
1123            db_pool.clone(),
1124            exec_config.clone(),
1125            worker,
1126            tick_fn,
1127        )
1128        .await;
1129        assert_eq!(3, execution_log.events.len());
1130        let (reason, kind, detail) = assert_matches!(
1131            &execution_log.events.get(2).unwrap(),
1132            ExecutionEvent {
1133                event: ExecutionEventInner::Finished{
1134                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1135                    http_client_traces: None
1136                },
1137                created_at: at,
1138                backtrace_id: None,
1139            } if *at == created_at
1140            => (reason_inner, kind, detail)
1141        );
1142        assert_eq!(expected_reason, *reason);
1143        assert_eq!(Some(expected_detail), detail.as_deref());
1144        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1145
1146        db_pool.close().await.unwrap();
1147    }
1148
1149    #[tokio::test]
1150    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1151        let worker_error = WorkerError::ActivityTrap {
1152            reason: "error reason".to_string(),
1153            trap_kind: TrapKind::Trap,
1154            detail: "detail".to_string(),
1155            version: Version::new(2),
1156            http_client_traces: None,
1157        };
1158        let expected_child_err = FinishedExecutionError::PermanentFailure {
1159            reason_full: "activity trap: error reason".to_string(),
1160            reason_inner: "error reason".to_string(),
1161            kind: PermanentFailureKind::ActivityTrap,
1162            detail: Some("detail".to_string()),
1163        };
1164        child_execution_permanently_failed_should_notify_parent(
1165            WorkerResult::Err(worker_error),
1166            expected_child_err,
1167        )
1168        .await;
1169    }
1170
1171    // TODO: Add test for WorkerError::TemporaryTimeout
1172
1173    #[tokio::test]
1174    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1175        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1176        child_execution_permanently_failed_should_notify_parent(
1177            WorkerResult::DbUpdatedByWorkerOrWatcher,
1178            expected_child_err,
1179        )
1180        .await;
1181    }
1182
1183    #[tokio::test]
1184    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1185        let parent_id = ExecutionId::from_parts(1, 1);
1186        let join_set_id_outer =
1187            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1188        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1189        let join_set_id_inner =
1190            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1191        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1192        let worker_error = WorkerError::FatalError(
1193            FatalError::UnhandledChildExecutionError {
1194                child_execution_id: child_execution_id.clone(),
1195                root_cause_id: root_cause_id.clone(),
1196            },
1197            Version::new(2),
1198        );
1199        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1200            child_execution_id,
1201            root_cause_id,
1202        };
1203        child_execution_permanently_failed_should_notify_parent(
1204            WorkerResult::Err(worker_error),
1205            expected_child_err,
1206        )
1207        .await;
1208    }
1209
1210    #[expect(clippy::too_many_lines)]
1211    async fn child_execution_permanently_failed_should_notify_parent(
1212        worker_result: WorkerResult,
1213        expected_child_err: FinishedExecutionError,
1214    ) {
1215        use concepts::storage::JoinSetResponseEventOuter;
1216        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1217
1218        set_up();
1219        let sim_clock = SimClock::default();
1220        let (_guard, db_pool) = Database::Memory.set_up().await;
1221
1222        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1223            WorkerResult::DbUpdatedByWorkerOrWatcher,
1224        ));
1225        let parent_execution_id = ExecutionId::generate();
1226        db_pool
1227            .connection()
1228            .create(CreateRequest {
1229                created_at: sim_clock.now(),
1230                execution_id: parent_execution_id.clone(),
1231                ffqn: FFQN_SOME,
1232                params: Params::empty(),
1233                parent: None,
1234                metadata: concepts::ExecutionMetadata::empty(),
1235                scheduled_at: sim_clock.now(),
1236                retry_exp_backoff: Duration::ZERO,
1237                max_retries: 0,
1238                component_id: ComponentId::dummy_activity(),
1239                scheduled_by: None,
1240            })
1241            .await
1242            .unwrap();
1243        tick_fn(
1244            ExecConfig {
1245                batch_size: 1,
1246                lock_expiry: LOCK_EXPIRY,
1247                tick_sleep: Duration::ZERO,
1248                component_id: ComponentId::dummy_activity(),
1249                task_limiter: None,
1250            },
1251            sim_clock.clone(),
1252            db_pool.clone(),
1253            parent_worker,
1254            sim_clock.now(),
1255        )
1256        .await;
1257
1258        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1259        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1260        // executor does not append anything, this should have been written by the worker:
1261        {
1262            let child = CreateRequest {
1263                created_at: sim_clock.now(),
1264                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1265                ffqn: FFQN_CHILD,
1266                params: Params::empty(),
1267                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1268                metadata: concepts::ExecutionMetadata::empty(),
1269                scheduled_at: sim_clock.now(),
1270                retry_exp_backoff: Duration::ZERO,
1271                max_retries: 0,
1272                component_id: ComponentId::dummy_activity(),
1273                scheduled_by: None,
1274            };
1275            let current_time = sim_clock.now();
1276            let join_set = AppendRequest {
1277                created_at: current_time,
1278                event: ExecutionEventInner::HistoryEvent {
1279                    event: HistoryEvent::JoinSetCreate {
1280                        join_set_id: join_set_id.clone(),
1281                        closing_strategy: ClosingStrategy::Complete,
1282                    },
1283                },
1284            };
1285            let child_exec_req = AppendRequest {
1286                created_at: current_time,
1287                event: ExecutionEventInner::HistoryEvent {
1288                    event: HistoryEvent::JoinSetRequest {
1289                        join_set_id: join_set_id.clone(),
1290                        request: JoinSetRequest::ChildExecutionRequest {
1291                            child_execution_id: child_execution_id.clone(),
1292                        },
1293                    },
1294                },
1295            };
1296            let join_next = AppendRequest {
1297                created_at: current_time,
1298                event: ExecutionEventInner::HistoryEvent {
1299                    event: HistoryEvent::JoinNext {
1300                        join_set_id: join_set_id.clone(),
1301                        run_expires_at: sim_clock.now(),
1302                        closing: false,
1303                    },
1304                },
1305            };
1306            db_pool
1307                .connection()
1308                .append_batch_create_new_execution(
1309                    current_time,
1310                    vec![join_set, child_exec_req, join_next],
1311                    parent_execution_id.clone(),
1312                    Version::new(2),
1313                    vec![child],
1314                )
1315                .await
1316                .unwrap();
1317        }
1318
1319        let child_worker =
1320            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1321
1322        // execute the child
1323        tick_fn(
1324            ExecConfig {
1325                batch_size: 1,
1326                lock_expiry: LOCK_EXPIRY,
1327                tick_sleep: Duration::ZERO,
1328                component_id: ComponentId::dummy_activity(),
1329                task_limiter: None,
1330            },
1331            sim_clock.clone(),
1332            db_pool.clone(),
1333            child_worker,
1334            sim_clock.now(),
1335        )
1336        .await;
1337        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1338            // In case of timeout, let the timers watcher handle it
1339            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1340            expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1341                .await
1342                .unwrap();
1343        }
1344        let child_log = db_pool
1345            .connection()
1346            .get(&ExecutionId::Derived(child_execution_id.clone()))
1347            .await
1348            .unwrap();
1349        assert!(child_log.pending_state.is_finished());
1350        assert_eq!(
1351            Version(2),
1352            child_log.next_version,
1353            "created = 0, locked = 1, with_single_result = 2"
1354        );
1355        assert_eq!(
1356            ExecutionEventInner::Finished {
1357                result: Err(expected_child_err),
1358                http_client_traces: None
1359            },
1360            child_log.last_event().event
1361        );
1362        let parent_log = db_pool
1363            .connection()
1364            .get(&parent_execution_id)
1365            .await
1366            .unwrap();
1367        assert_matches!(
1368            parent_log.pending_state,
1369            PendingState::PendingAt {
1370                scheduled_at
1371            } if scheduled_at == sim_clock.now(),
1372            "parent should be back to pending"
1373        );
1374        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1375            parent_log.responses.last(),
1376            Some(JoinSetResponseEventOuter{
1377                created_at: at,
1378                event: JoinSetResponseEvent{
1379                    join_set_id: found_join_set_id,
1380                    event: JoinSetResponse::ChildExecutionFinished {
1381                        child_execution_id: found_child_execution_id,
1382                        finished_version,
1383                        result: found_result,
1384                    }
1385                }
1386            })
1387             if *at == sim_clock.now()
1388            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1389        );
1390        assert_eq!(join_set_id, *found_join_set_id);
1391        assert_eq!(child_execution_id, *found_child_execution_id);
1392        assert_eq!(child_log.next_version, *child_finished_version);
1393        assert!(found_result.is_err());
1394
1395        db_pool.close().await.unwrap();
1396    }
1397
1398    #[derive(Clone, Debug)]
1399    struct SleepyWorker {
1400        duration: Duration,
1401        result: SupportedFunctionReturnValue,
1402        exported: [FunctionMetadata; 1],
1403    }
1404
1405    #[async_trait]
1406    impl Worker for SleepyWorker {
1407        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1408            tokio::time::sleep(self.duration).await;
1409            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1410        }
1411
1412        fn exported_functions(&self) -> &[FunctionMetadata] {
1413            &self.exported
1414        }
1415
1416        fn imported_functions(&self) -> &[FunctionMetadata] {
1417            &[]
1418        }
1419    }
1420
1421    #[expect(clippy::too_many_lines)]
1422    #[tokio::test]
1423    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1424        set_up();
1425        let sim_clock = SimClock::default();
1426        let (_guard, db_pool) = Database::Memory.set_up().await;
1427        let lock_expiry = Duration::from_millis(100);
1428        let exec_config = ExecConfig {
1429            batch_size: 1,
1430            lock_expiry,
1431            tick_sleep: Duration::ZERO,
1432            component_id: ComponentId::dummy_activity(),
1433            task_limiter: None,
1434        };
1435
1436        let worker = Arc::new(SleepyWorker {
1437            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1438            result: SupportedFunctionReturnValue::None,
1439            exported: [FunctionMetadata {
1440                ffqn: FFQN_SOME,
1441                parameter_types: ParameterTypes::default(),
1442                return_type: None,
1443                extension: None,
1444                submittable: true,
1445            }],
1446        });
1447        // Create an execution
1448        let execution_id = ExecutionId::generate();
1449        let timeout_duration = Duration::from_millis(300);
1450        let db_connection = db_pool.connection();
1451        db_connection
1452            .create(CreateRequest {
1453                created_at: sim_clock.now(),
1454                execution_id: execution_id.clone(),
1455                ffqn: FFQN_SOME,
1456                params: Params::empty(),
1457                parent: None,
1458                metadata: concepts::ExecutionMetadata::empty(),
1459                scheduled_at: sim_clock.now(),
1460                retry_exp_backoff: timeout_duration,
1461                max_retries: 1,
1462                component_id: ComponentId::dummy_activity(),
1463                scheduled_by: None,
1464            })
1465            .await
1466            .unwrap();
1467
1468        let ffqns = super::extract_ffqns(worker.as_ref());
1469        let executor = ExecTask::new(
1470            worker,
1471            exec_config,
1472            sim_clock.clone(),
1473            db_pool.clone(),
1474            ffqns,
1475        );
1476        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1477        assert_eq!(1, first_execution_progress.executions.len());
1478        // Started hanging, wait for lock expiry.
1479        sim_clock.move_time_forward(lock_expiry).await;
1480        // cleanup should be called
1481        let now_after_first_lock_expiry = sim_clock.now();
1482        {
1483            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1484            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1485            assert!(cleanup_progress.executions.is_empty());
1486        }
1487        {
1488            let expired_locks =
1489                expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1490                    .await
1491                    .unwrap()
1492                    .expired_locks;
1493            assert_eq!(1, expired_locks);
1494        }
1495        assert!(
1496            !first_execution_progress
1497                .executions
1498                .pop()
1499                .unwrap()
1500                .1
1501                .is_finished()
1502        );
1503
1504        let execution_log = db_connection.get(&execution_id).await.unwrap();
1505        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1506        assert_matches!(
1507            &execution_log.events.get(2).unwrap(),
1508            ExecutionEvent {
1509                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1510                created_at: at,
1511                backtrace_id: None,
1512            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1513        );
1514        assert_eq!(
1515            PendingState::PendingAt {
1516                scheduled_at: expected_first_timeout_expiry
1517            },
1518            execution_log.pending_state
1519        );
1520        sim_clock.move_time_forward(timeout_duration).await;
1521        let now_after_first_timeout = sim_clock.now();
1522        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1523
1524        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1525        assert_eq!(1, second_execution_progress.executions.len());
1526
1527        // Started hanging, wait for lock expiry.
1528        sim_clock.move_time_forward(lock_expiry).await;
1529        // cleanup should be called
1530        let now_after_second_lock_expiry = sim_clock.now();
1531        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1532        {
1533            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1534            assert!(cleanup_progress.executions.is_empty());
1535        }
1536        {
1537            let expired_locks =
1538                expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1539                    .await
1540                    .unwrap()
1541                    .expired_locks;
1542            assert_eq!(1, expired_locks);
1543        }
1544        assert!(
1545            !second_execution_progress
1546                .executions
1547                .pop()
1548                .unwrap()
1549                .1
1550                .is_finished()
1551        );
1552
1553        drop(db_connection);
1554        drop(executor);
1555        db_pool.close().await.unwrap();
1556    }
1557}