obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12    FinishedExecutionError,
13    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29    pub lock_expiry: Duration,
30    pub tick_sleep: Duration,
31    pub batch_size: u32,
32    pub component_id: ComponentId,
33    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37    worker: Arc<dyn Worker>,
38    config: ExecConfig,
39    clock_fn: C, // Used for obtaining current time when the execution finishes.
40    db_pool: P,
41    executor_id: ExecutorId,
42    phantom_data: PhantomData<DB>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: P,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            executor_id: ExecutorId::generate(),
123            db_pool,
124            phantom_data: PhantomData,
125            ffqns,
126            clock_fn,
127        }
128    }
129
130    pub fn spawn_new(
131        worker: Arc<dyn Worker>,
132        config: ExecConfig,
133        clock_fn: C,
134        db_pool: P,
135        executor_id: ExecutorId,
136    ) -> ExecutorTaskHandle {
137        let is_closing = Arc::new(AtomicBool::default());
138        let is_closing_inner = is_closing.clone();
139        let ffqns = extract_ffqns(worker.as_ref());
140        let component_id = config.component_id.clone();
141        let abort_handle = tokio::spawn(async move {
142            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143            let task = Self {
144                worker,
145                config,
146                executor_id,
147                db_pool,
148                phantom_data: PhantomData,
149                ffqns: ffqns.clone(),
150                clock_fn: clock_fn.clone(),
151            };
152            loop {
153                let _ = task.tick(clock_fn.now()).await;
154                let executed_at = clock_fn.now();
155                task.db_pool
156                    .connection()
157                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158                    .await;
159                if is_closing_inner.load(Ordering::Relaxed) {
160                    return;
161                }
162            }
163        })
164        .abort_handle();
165        ExecutorTaskHandle {
166            is_closing,
167            abort_handle,
168            component_id,
169            executor_id,
170        }
171    }
172
173    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174        if let Some(task_limiter) = &self.config.task_limiter {
175            let mut locks = Vec::new();
176            for _ in 0..self.config.batch_size {
177                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178                    locks.push(Some(permit));
179                } else {
180                    break;
181                }
182            }
183            locks
184        } else {
185            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186            for _ in 0..self.config.batch_size {
187                vec.push(None);
188            }
189            vec
190        }
191    }
192
193    #[cfg(feature = "test")]
194    pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200        let locked_executions = {
201            let mut permits = self.acquire_task_permits();
202            if permits.is_empty() {
203                return Ok(ExecutionProgress::default());
204            }
205            let db_connection = self.db_pool.connection();
206            let lock_expires_at = executed_at + self.config.lock_expiry;
207            let locked_executions = db_connection
208                .lock_pending(
209                    permits.len(), // batch size
210                    executed_at,   // fetch expiring before now
211                    self.ffqns.clone(),
212                    executed_at, // created at
213                    self.config.component_id.clone(),
214                    self.executor_id,
215                    lock_expires_at,
216                )
217                .await
218                .map_err(|err| {
219                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220                })?;
221            // Drop permits if too many were allocated.
222            while permits.len() > locked_executions.len() {
223                permits.pop();
224            }
225            assert_eq!(permits.len(), locked_executions.len());
226            locked_executions.into_iter().zip(permits)
227        };
228        let execution_deadline = executed_at + self.config.lock_expiry;
229
230        let mut executions = Vec::with_capacity(locked_executions.len());
231        for (locked_execution, permit) in locked_executions {
232            let execution_id = locked_execution.execution_id.clone();
233            let join_handle = {
234                let worker = self.worker.clone();
235                let db_pool = self.db_pool.clone();
236                let clock_fn = self.clock_fn.clone();
237                let run_id = locked_execution.run_id;
238                let worker_span = info_span!(parent: None, "worker",
239                    "otel.name" = format!("worker {}", locked_execution.ffqn),
240                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241                locked_execution.metadata.enrich(&worker_span);
242                tokio::spawn({
243                    let worker_span2 = worker_span.clone();
244                    async move {
245                        let res = Self::run_worker(
246                            worker,
247                            &db_pool,
248                            execution_deadline,
249                            clock_fn,
250                            locked_execution,
251                            worker_span2,
252                        )
253                        .await;
254                        if let Err(db_error) = res {
255                            error!("Execution will be timed out not writing `{db_error:?}`");
256                        }
257                        drop(permit);
258                    }
259                    .instrument(worker_span)
260                })
261            };
262            executions.push((execution_id, join_handle));
263        }
264        Ok(ExecutionProgress { executions })
265    }
266
267    async fn run_worker(
268        worker: Arc<dyn Worker>,
269        db_pool: &P,
270        execution_deadline: DateTime<Utc>,
271        clock_fn: C,
272        locked_execution: LockedExecution,
273        worker_span: Span,
274    ) -> Result<(), DbError> {
275        debug!("Worker::run starting");
276        trace!(
277            version = %locked_execution.version,
278            params = ?locked_execution.params,
279            event_history = ?locked_execution.event_history,
280            "Worker::run starting"
281        );
282        let can_be_retried = ExecutionLog::can_be_retried_after(
283            locked_execution.temporary_event_count + 1,
284            locked_execution.max_retries,
285            locked_execution.retry_exp_backoff,
286        );
287        let unlock_expiry_on_limit_reached =
288            ExecutionLog::compute_retry_duration_when_retrying_forever(
289                locked_execution.temporary_event_count + 1,
290                locked_execution.retry_exp_backoff,
291            );
292        let ctx = WorkerContext {
293            execution_id: locked_execution.execution_id.clone(),
294            metadata: locked_execution.metadata,
295            ffqn: locked_execution.ffqn,
296            params: locked_execution.params,
297            event_history: locked_execution.event_history,
298            responses: locked_execution
299                .responses
300                .into_iter()
301                .map(|outer| outer.event)
302                .collect(),
303            version: locked_execution.version,
304            execution_deadline,
305            can_be_retried: can_be_retried.is_some(),
306            run_id: locked_execution.run_id,
307            worker_span,
308        };
309        let worker_result = worker.run(ctx).await;
310        trace!(?worker_result, "Worker::run finished");
311        let result_obtained_at = clock_fn.now();
312        match Self::worker_result_to_execution_event(
313            locked_execution.execution_id,
314            worker_result,
315            result_obtained_at,
316            locked_execution.parent,
317            can_be_retried,
318            unlock_expiry_on_limit_reached,
319        )? {
320            Some(append) => {
321                let db_connection = db_pool.connection();
322                trace!("Appending {append:?}");
323                append.clone().append(&db_connection).await
324            }
325            None => Ok(()),
326        }
327    }
328
329    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
330    /// Map the `WorkerError` to an temporary or a permanent failure.
331    #[expect(clippy::too_many_lines)]
332    fn worker_result_to_execution_event(
333        execution_id: ExecutionId,
334        worker_result: WorkerResult,
335        result_obtained_at: DateTime<Utc>,
336        parent: Option<(ExecutionId, JoinSetId)>,
337        can_be_retried: Option<Duration>,
338        unlock_expiry_on_limit_reached: Duration,
339    ) -> Result<Option<Append>, DbError> {
340        Ok(match worker_result {
341            WorkerResult::Ok(result, new_version, http_client_traces) => {
342                info!(
343                    "Execution finished: {}",
344                    result.as_pending_state_finished_result()
345                );
346                let child_finished =
347                    parent.map(
348                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349                            parent_execution_id,
350                            parent_join_set,
351                            result: Ok(result.clone()),
352                        },
353                    );
354                let primary_event = AppendRequest {
355                    created_at: result_obtained_at,
356                    event: ExecutionEventInner::Finished {
357                        result: Ok(result),
358                        http_client_traces,
359                    },
360                };
361
362                Some(Append {
363                    created_at: result_obtained_at,
364                    primary_event,
365                    execution_id,
366                    version: new_version,
367                    child_finished,
368                })
369            }
370            WorkerResult::DbUpdatedByWorker => None,
371            WorkerResult::Err(err) => {
372                let reason = err.to_string();
373                let (primary_event, child_finished, version) = match err {
374                    WorkerError::TemporaryTimeoutHandledByWatcher => {
375                        info!("Temporary timeout handled by watcher");
376                        return Ok(None);
377                    }
378                    WorkerError::TemporaryTimeout {
379                        http_client_traces,
380                        version,
381                    } => {
382                        if let Some(duration) = can_be_retried {
383                            let backoff_expires_at = result_obtained_at + duration;
384                            info!("Temporary timeout after {duration:?} at {backoff_expires_at}");
385                            (
386                                ExecutionEventInner::TemporarilyTimedOut {
387                                    backoff_expires_at,
388                                    http_client_traces,
389                                },
390                                None,
391                                version,
392                            )
393                        } else {
394                            info!("Permanent timeout");
395                            let result = Err(FinishedExecutionError::PermanentTimeout);
396                            let child_finished =
397                                parent.map(|(parent_execution_id, parent_join_set)| {
398                                    ChildFinishedResponse {
399                                        parent_execution_id,
400                                        parent_join_set,
401                                        result: result.clone(),
402                                    }
403                                });
404                            (
405                                ExecutionEventInner::Finished {
406                                    result,
407                                    http_client_traces,
408                                },
409                                child_finished,
410                                version,
411                            )
412                        }
413                    }
414                    WorkerError::DbError(db_error) => {
415                        return Err(db_error);
416                    }
417                    WorkerError::ActivityTrap {
418                        reason: reason_inner,
419                        trap_kind,
420                        detail,
421                        version,
422                        http_client_traces,
423                    } => {
424                        if let Some(duration) = can_be_retried {
425                            let expires_at = result_obtained_at + duration;
426                            debug!(
427                                "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
428                            );
429                            (
430                                ExecutionEventInner::TemporarilyFailed {
431                                    backoff_expires_at: expires_at,
432                                    reason_full: StrVariant::from(reason),
433                                    reason_inner: StrVariant::from(reason_inner),
434                                    detail: Some(detail),
435                                    http_client_traces,
436                                },
437                                None,
438                                version,
439                            )
440                        } else {
441                            info!(
442                                "Activity {trap_kind} marked as permanent failure - {reason_inner}"
443                            );
444                            let result = Err(FinishedExecutionError::PermanentFailure {
445                                reason_inner,
446                                reason_full: reason,
447                                kind: PermanentFailureKind::ActivityTrap,
448                                detail: Some(detail),
449                            });
450                            let child_finished =
451                                parent.map(|(parent_execution_id, parent_join_set)| {
452                                    ChildFinishedResponse {
453                                        parent_execution_id,
454                                        parent_join_set,
455                                        result: result.clone(),
456                                    }
457                                });
458                            (
459                                ExecutionEventInner::Finished {
460                                    result,
461                                    http_client_traces,
462                                },
463                                child_finished,
464                                version,
465                            )
466                        }
467                    }
468                    WorkerError::ActivityReturnedError {
469                        detail,
470                        version,
471                        http_client_traces,
472                    } => {
473                        let duration = can_be_retried.expect(
474                            "ActivityReturnedError must not be returned when retries are exhausted",
475                        );
476                        let expires_at = result_obtained_at + duration;
477                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
478                        (
479                            ExecutionEventInner::TemporarilyFailed {
480                                backoff_expires_at: expires_at,
481                                reason_full: StrVariant::Static("activity returned error"),
482                                reason_inner: StrVariant::Static("activity returned error"),
483                                detail,
484                                http_client_traces,
485                            },
486                            None,
487                            version,
488                        )
489                    }
490                    WorkerError::TemporaryWorkflowTrap {
491                        reason: reason_inner,
492                        kind,
493                        detail,
494                        version,
495                    } => {
496                        let duration = can_be_retried.expect("workflows are retried forever");
497                        let expires_at = result_obtained_at + duration;
498                        debug!(
499                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
500                        );
501                        (
502                            ExecutionEventInner::TemporarilyFailed {
503                                backoff_expires_at: expires_at,
504                                reason_full: StrVariant::from(reason),
505                                reason_inner: StrVariant::from(reason_inner),
506                                detail,
507                                http_client_traces: None,
508                            },
509                            None,
510                            version,
511                        )
512                    }
513                    WorkerError::LimitReached {
514                        reason: inner_reason,
515                        version: new_version,
516                    } => {
517                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
518                        warn!(
519                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
520                        );
521                        (
522                            ExecutionEventInner::Unlocked {
523                                backoff_expires_at: expires_at,
524                                reason: StrVariant::from(reason),
525                            },
526                            None,
527                            new_version,
528                        )
529                    }
530                    WorkerError::FatalError(fatal_error, version) => {
531                        info!("Fatal worker error - {fatal_error:?}");
532                        let result = Err(FinishedExecutionError::from(fatal_error));
533                        let child_finished =
534                            parent.map(|(parent_execution_id, parent_join_set)| {
535                                ChildFinishedResponse {
536                                    parent_execution_id,
537                                    parent_join_set,
538                                    result: result.clone(),
539                                }
540                            });
541                        (
542                            ExecutionEventInner::Finished {
543                                result,
544                                http_client_traces: None,
545                            },
546                            child_finished,
547                            version,
548                        )
549                    }
550                };
551                Some(Append {
552                    created_at: result_obtained_at,
553                    primary_event: AppendRequest {
554                        created_at: result_obtained_at,
555                        event: primary_event,
556                    },
557                    execution_id,
558                    version,
559                    child_finished,
560                })
561            }
562        })
563    }
564}
565
566#[derive(Debug, Clone)]
567pub(crate) struct ChildFinishedResponse {
568    pub(crate) parent_execution_id: ExecutionId,
569    pub(crate) parent_join_set: JoinSetId,
570    pub(crate) result: FinishedExecutionResult,
571}
572
573#[derive(Debug, Clone)]
574pub(crate) struct Append {
575    pub(crate) created_at: DateTime<Utc>,
576    pub(crate) primary_event: AppendRequest,
577    pub(crate) execution_id: ExecutionId,
578    pub(crate) version: Version,
579    pub(crate) child_finished: Option<ChildFinishedResponse>,
580}
581
582impl Append {
583    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
584        if let Some(child_finished) = self.child_finished {
585            assert_matches!(
586                &self.primary_event,
587                AppendRequest {
588                    event: ExecutionEventInner::Finished { .. },
589                    ..
590                }
591            );
592            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
593            db_connection
594                .append_batch_respond_to_parent(
595                    derived.clone(),
596                    self.created_at,
597                    vec![self.primary_event],
598                    self.version.clone(),
599                    child_finished.parent_execution_id,
600                    JoinSetResponseEventOuter {
601                        created_at: self.created_at,
602                        event: JoinSetResponseEvent {
603                            join_set_id: child_finished.parent_join_set,
604                            event: JoinSetResponse::ChildExecutionFinished {
605                                child_execution_id: derived,
606                                // Since self.primary_event is a finished event, the version will remain the same.
607                                finished_version: self.version,
608                                result: child_finished.result,
609                            },
610                        },
611                    },
612                )
613                .await?;
614        } else {
615            db_connection
616                .append_batch(
617                    self.created_at,
618                    vec![self.primary_event],
619                    self.execution_id,
620                    self.version,
621                )
622                .await?;
623        }
624        Ok(())
625    }
626}
627
628#[cfg(any(test, feature = "test"))]
629pub mod simple_worker {
630    use crate::worker::{Worker, WorkerContext, WorkerResult};
631    use async_trait::async_trait;
632    use concepts::{
633        FunctionFqn, FunctionMetadata, ParameterTypes,
634        storage::{HistoryEvent, Version},
635    };
636    use indexmap::IndexMap;
637    use std::sync::Arc;
638    use tracing::trace;
639
640    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
641    pub type SimpleWorkerResultMap =
642        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
643
644    #[derive(Clone, Debug)]
645    pub struct SimpleWorker {
646        pub worker_results_rev: SimpleWorkerResultMap,
647        pub ffqn: FunctionFqn,
648        exported: [FunctionMetadata; 1],
649    }
650
651    impl SimpleWorker {
652        #[must_use]
653        pub fn with_single_result(res: WorkerResult) -> Self {
654            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
655                Version::new(2),
656                (vec![], res),
657            )]))))
658        }
659
660        #[must_use]
661        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
662            Self {
663                worker_results_rev: self.worker_results_rev,
664                exported: [FunctionMetadata {
665                    ffqn: ffqn.clone(),
666                    parameter_types: ParameterTypes::default(),
667                    return_type: None,
668                    extension: None,
669                    submittable: true,
670                }],
671                ffqn,
672            }
673        }
674
675        #[must_use]
676        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
677            Self {
678                worker_results_rev,
679                ffqn: FFQN_SOME,
680                exported: [FunctionMetadata {
681                    ffqn: FFQN_SOME,
682                    parameter_types: ParameterTypes::default(),
683                    return_type: None,
684                    extension: None,
685                    submittable: true,
686                }],
687            }
688        }
689    }
690
691    #[async_trait]
692    impl Worker for SimpleWorker {
693        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
694            let (expected_version, (expected_eh, worker_result)) =
695                self.worker_results_rev.lock().unwrap().pop().unwrap();
696            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
697            assert_eq!(expected_version, ctx.version);
698            assert_eq!(expected_eh, ctx.event_history);
699            worker_result
700        }
701
702        fn exported_functions(&self) -> &[FunctionMetadata] {
703            &self.exported
704        }
705
706        fn imported_functions(&self) -> &[FunctionMetadata] {
707            &[]
708        }
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use self::simple_worker::SimpleWorker;
715    use super::*;
716    use crate::worker::FatalError;
717    use crate::{expired_timers_watcher, worker::WorkerResult};
718    use assert_matches::assert_matches;
719    use async_trait::async_trait;
720    use concepts::storage::{CreateRequest, JoinSetRequest};
721    use concepts::storage::{
722        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
723    };
724    use concepts::time::Now;
725    use concepts::{
726        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
727        SupportedFunctionReturnValue, TrapKind,
728    };
729    use db_tests::Database;
730    use indexmap::IndexMap;
731    use simple_worker::FFQN_SOME;
732    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
733    use test_utils::set_up;
734    use test_utils::sim_clock::{ConstClock, SimClock};
735
736    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
737
738    async fn tick_fn<
739        W: Worker + Debug,
740        C: ClockFn + 'static,
741        DB: DbConnection + 'static,
742        P: DbPool<DB> + 'static,
743    >(
744        config: ExecConfig,
745        clock_fn: C,
746        db_pool: P,
747        worker: Arc<W>,
748        executed_at: DateTime<Utc>,
749    ) -> ExecutionProgress {
750        trace!("Ticking with {worker:?}");
751        let ffqns = super::extract_ffqns(worker.as_ref());
752        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
753        let mut execution_progress = executor.tick(executed_at).await.unwrap();
754        loop {
755            execution_progress
756                .executions
757                .retain(|(_, abort_handle)| !abort_handle.is_finished());
758            if execution_progress.executions.is_empty() {
759                return execution_progress;
760            }
761            tokio::time::sleep(Duration::from_millis(10)).await;
762        }
763    }
764
765    #[tokio::test]
766    async fn execute_simple_lifecycle_tick_based_mem() {
767        let created_at = Now.now();
768        let (_guard, db_pool) = Database::Memory.set_up().await;
769        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
770        db_pool.close().await.unwrap();
771    }
772
773    #[cfg(not(madsim))]
774    #[tokio::test]
775    async fn execute_simple_lifecycle_tick_based_sqlite() {
776        let created_at = Now.now();
777        let (_guard, db_pool) = Database::Sqlite.set_up().await;
778        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
779        db_pool.close().await.unwrap();
780    }
781
782    async fn execute_simple_lifecycle_tick_based<
783        DB: DbConnection + 'static,
784        P: DbPool<DB> + 'static,
785        C: ClockFn + 'static,
786    >(
787        pool: P,
788        clock_fn: C,
789    ) {
790        set_up();
791        let created_at = clock_fn.now();
792        let exec_config = ExecConfig {
793            batch_size: 1,
794            lock_expiry: Duration::from_secs(1),
795            tick_sleep: Duration::from_millis(100),
796            component_id: ComponentId::dummy_activity(),
797            task_limiter: None,
798        };
799
800        let execution_log = create_and_tick(
801            CreateAndTickConfig {
802                execution_id: ExecutionId::generate(),
803                created_at,
804                max_retries: 0,
805                executed_at: created_at,
806                retry_exp_backoff: Duration::ZERO,
807            },
808            clock_fn,
809            pool,
810            exec_config,
811            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
812                SupportedFunctionReturnValue::None,
813                Version::new(2),
814                None,
815            ))),
816            tick_fn,
817        )
818        .await;
819        assert_matches!(
820            execution_log.events.get(2).unwrap(),
821            ExecutionEvent {
822                event: ExecutionEventInner::Finished {
823                    result: Ok(SupportedFunctionReturnValue::None),
824                    http_client_traces: None
825                },
826                created_at: _,
827                backtrace_id: None,
828            }
829        );
830    }
831
832    #[tokio::test]
833    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
834        set_up();
835        let created_at = Now.now();
836        let clock_fn = ConstClock(created_at);
837        let (_guard, db_pool) = Database::Memory.set_up().await;
838        let exec_config = ExecConfig {
839            batch_size: 1,
840            lock_expiry: Duration::from_secs(1),
841            tick_sleep: Duration::ZERO,
842            component_id: ComponentId::dummy_activity(),
843            task_limiter: None,
844        };
845
846        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
847            SupportedFunctionReturnValue::None,
848            Version::new(2),
849            None,
850        )));
851        let exec_task = ExecTask::spawn_new(
852            worker.clone(),
853            exec_config.clone(),
854            clock_fn,
855            db_pool.clone(),
856            ExecutorId::generate(),
857        );
858
859        let execution_log = create_and_tick(
860            CreateAndTickConfig {
861                execution_id: ExecutionId::generate(),
862                created_at,
863                max_retries: 0,
864                executed_at: created_at,
865                retry_exp_backoff: Duration::ZERO,
866            },
867            clock_fn,
868            db_pool.clone(),
869            exec_config,
870            worker,
871            |_, _, _, _, _| async {
872                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
873                ExecutionProgress::default()
874            },
875        )
876        .await;
877        exec_task.close().await;
878        db_pool.close().await.unwrap();
879        assert_matches!(
880            execution_log.events.get(2).unwrap(),
881            ExecutionEvent {
882                event: ExecutionEventInner::Finished {
883                    result: Ok(SupportedFunctionReturnValue::None),
884                    http_client_traces: None
885                },
886                created_at: _,
887                backtrace_id: None,
888            }
889        );
890    }
891
892    struct CreateAndTickConfig {
893        execution_id: ExecutionId,
894        created_at: DateTime<Utc>,
895        max_retries: u32,
896        executed_at: DateTime<Utc>,
897        retry_exp_backoff: Duration,
898    }
899
900    async fn create_and_tick<
901        W: Worker,
902        C: ClockFn,
903        DB: DbConnection,
904        P: DbPool<DB>,
905        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
906        F: Future<Output = ExecutionProgress>,
907    >(
908        config: CreateAndTickConfig,
909        clock_fn: C,
910        db_pool: P,
911        exec_config: ExecConfig,
912        worker: Arc<W>,
913        mut tick: T,
914    ) -> ExecutionLog {
915        // Create an execution
916        let db_connection = db_pool.connection();
917        db_connection
918            .create(CreateRequest {
919                created_at: config.created_at,
920                execution_id: config.execution_id.clone(),
921                ffqn: FFQN_SOME,
922                params: Params::empty(),
923                parent: None,
924                metadata: concepts::ExecutionMetadata::empty(),
925                scheduled_at: config.created_at,
926                retry_exp_backoff: config.retry_exp_backoff,
927                max_retries: config.max_retries,
928                component_id: ComponentId::dummy_activity(),
929                scheduled_by: None,
930            })
931            .await
932            .unwrap();
933        // execute!
934        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
935        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
936        debug!("Execution history after tick: {execution_log:?}");
937        // check that DB contains Created and Locked events.
938        assert_matches!(
939            execution_log.events.first().unwrap(),
940            ExecutionEvent {
941                event: ExecutionEventInner::Created { .. },
942                created_at: actually_created_at,
943                backtrace_id: None,
944            }
945            if config.created_at == *actually_created_at
946        );
947        let locked_at = assert_matches!(
948            execution_log.events.get(1).unwrap(),
949            ExecutionEvent {
950                event: ExecutionEventInner::Locked { .. },
951                created_at: locked_at,
952                backtrace_id: None,
953            } if config.created_at <= *locked_at
954            => *locked_at
955        );
956        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
957            event: _,
958            created_at: executed_at,
959            backtrace_id: None,
960        } if *executed_at >= locked_at);
961        execution_log
962    }
963
964    #[expect(clippy::too_many_lines)]
965    #[tokio::test]
966    async fn activity_trap_should_trigger_an_execution_retry() {
967        set_up();
968        let sim_clock = SimClock::default();
969        let (_guard, db_pool) = Database::Memory.set_up().await;
970        let exec_config = ExecConfig {
971            batch_size: 1,
972            lock_expiry: Duration::from_secs(1),
973            tick_sleep: Duration::ZERO,
974            component_id: ComponentId::dummy_activity(),
975            task_limiter: None,
976        };
977        let expected_reason = "error reason";
978        let expected_detail = "error detail";
979        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
980            WorkerError::ActivityTrap {
981                reason: expected_reason.to_string(),
982                trap_kind: concepts::TrapKind::Trap,
983                detail: expected_detail.to_string(),
984                version: Version::new(2),
985                http_client_traces: None,
986            },
987        )));
988        let retry_exp_backoff = Duration::from_millis(100);
989        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
990        let execution_log = create_and_tick(
991            CreateAndTickConfig {
992                execution_id: ExecutionId::generate(),
993                created_at: sim_clock.now(),
994                max_retries: 1,
995                executed_at: sim_clock.now(),
996                retry_exp_backoff,
997            },
998            sim_clock.clone(),
999            db_pool.clone(),
1000            exec_config.clone(),
1001            worker,
1002            tick_fn,
1003        )
1004        .await;
1005        assert_eq!(3, execution_log.events.len());
1006        {
1007            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1008                &execution_log.events.get(2).unwrap(),
1009                ExecutionEvent {
1010                    event: ExecutionEventInner::TemporarilyFailed {
1011                        reason_inner,
1012                        reason_full,
1013                        detail,
1014                        backoff_expires_at,
1015                        http_client_traces: None,
1016                    },
1017                    created_at: at,
1018                    backtrace_id: None,
1019                }
1020                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1021            );
1022            assert_eq!(expected_reason, reason_inner.deref());
1023            assert_eq!(
1024                format!("activity trap: {expected_reason}"),
1025                reason_full.deref()
1026            );
1027            assert_eq!(Some(expected_detail), detail.as_deref());
1028            assert_eq!(at, sim_clock.now());
1029            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1030        }
1031        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1032            std::sync::Mutex::new(IndexMap::from([(
1033                Version::new(4),
1034                (
1035                    vec![],
1036                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1037                ),
1038            )])),
1039        )));
1040        // noop until `retry_exp_backoff` expires
1041        assert!(
1042            tick_fn(
1043                exec_config.clone(),
1044                sim_clock.clone(),
1045                db_pool.clone(),
1046                worker.clone(),
1047                sim_clock.now(),
1048            )
1049            .await
1050            .executions
1051            .is_empty()
1052        );
1053        // tick again to finish the execution
1054        sim_clock.move_time_forward(retry_exp_backoff).await;
1055        tick_fn(
1056            exec_config,
1057            sim_clock.clone(),
1058            db_pool.clone(),
1059            worker,
1060            sim_clock.now(),
1061        )
1062        .await;
1063        let execution_log = {
1064            let db_connection = db_pool.connection();
1065            db_connection
1066                .get(&execution_log.execution_id)
1067                .await
1068                .unwrap()
1069        };
1070        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1071        assert_matches!(
1072            execution_log.events.get(3).unwrap(),
1073            ExecutionEvent {
1074                event: ExecutionEventInner::Locked { .. },
1075                created_at: at,
1076                backtrace_id: None,
1077            } if *at == sim_clock.now()
1078        );
1079        assert_matches!(
1080            execution_log.events.get(4).unwrap(),
1081            ExecutionEvent {
1082                event: ExecutionEventInner::Finished {
1083                    result: Ok(SupportedFunctionReturnValue::None),
1084                    http_client_traces: None
1085                },
1086                created_at: finished_at,
1087                backtrace_id: None,
1088            } if *finished_at == sim_clock.now()
1089        );
1090        db_pool.close().await.unwrap();
1091    }
1092
1093    #[tokio::test]
1094    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1095        set_up();
1096        let created_at = Now.now();
1097        let clock_fn = ConstClock(created_at);
1098        let (_guard, db_pool) = Database::Memory.set_up().await;
1099        let exec_config = ExecConfig {
1100            batch_size: 1,
1101            lock_expiry: Duration::from_secs(1),
1102            tick_sleep: Duration::ZERO,
1103            component_id: ComponentId::dummy_activity(),
1104            task_limiter: None,
1105        };
1106
1107        let expected_reason = "error reason";
1108        let expected_detail = "error detail";
1109        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1110            WorkerError::ActivityTrap {
1111                reason: expected_reason.to_string(),
1112                trap_kind: concepts::TrapKind::Trap,
1113                detail: expected_detail.to_string(),
1114                version: Version::new(2),
1115                http_client_traces: None,
1116            },
1117        )));
1118        let execution_log = create_and_tick(
1119            CreateAndTickConfig {
1120                execution_id: ExecutionId::generate(),
1121                created_at,
1122                max_retries: 0,
1123                executed_at: created_at,
1124                retry_exp_backoff: Duration::ZERO,
1125            },
1126            clock_fn,
1127            db_pool.clone(),
1128            exec_config.clone(),
1129            worker,
1130            tick_fn,
1131        )
1132        .await;
1133        assert_eq!(3, execution_log.events.len());
1134        let (reason, kind, detail) = assert_matches!(
1135            &execution_log.events.get(2).unwrap(),
1136            ExecutionEvent {
1137                event: ExecutionEventInner::Finished{
1138                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1139                    http_client_traces: None
1140                },
1141                created_at: at,
1142                backtrace_id: None,
1143            } if *at == created_at
1144            => (reason_inner, kind, detail)
1145        );
1146        assert_eq!(expected_reason, *reason);
1147        assert_eq!(Some(expected_detail), detail.as_deref());
1148        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1149
1150        db_pool.close().await.unwrap();
1151    }
1152
1153    #[tokio::test]
1154    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1155        let worker_error = WorkerError::ActivityTrap {
1156            reason: "error reason".to_string(),
1157            trap_kind: TrapKind::Trap,
1158            detail: "detail".to_string(),
1159            version: Version::new(2),
1160            http_client_traces: None,
1161        };
1162        let expected_child_err = FinishedExecutionError::PermanentFailure {
1163            reason_full: "activity trap: error reason".to_string(),
1164            reason_inner: "error reason".to_string(),
1165            kind: PermanentFailureKind::ActivityTrap,
1166            detail: Some("detail".to_string()),
1167        };
1168        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1169            .await;
1170    }
1171
1172    // TODO: Add test for WorkerError::TemporaryTimeout
1173
1174    #[tokio::test]
1175    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1176        let worker_error = WorkerError::TemporaryTimeoutHandledByWatcher;
1177        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1178        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1179            .await;
1180    }
1181
1182    #[tokio::test]
1183    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1184        let parent_id = ExecutionId::from_parts(1, 1);
1185        let join_set_id_outer =
1186            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1187        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1188        let join_set_id_inner =
1189            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1190        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1191        let worker_error = WorkerError::FatalError(
1192            FatalError::UnhandledChildExecutionError {
1193                child_execution_id: child_execution_id.clone(),
1194                root_cause_id: root_cause_id.clone(),
1195            },
1196            Version::new(2),
1197        );
1198        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1199            child_execution_id,
1200            root_cause_id,
1201        };
1202        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1203            .await;
1204    }
1205
1206    #[expect(clippy::too_many_lines)]
1207    async fn child_execution_permanently_failed_should_notify_parent(
1208        worker_error: WorkerError,
1209        expected_child_err: FinishedExecutionError,
1210    ) {
1211        use concepts::storage::JoinSetResponseEventOuter;
1212        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1213
1214        set_up();
1215        let sim_clock = SimClock::default();
1216        let (_guard, db_pool) = Database::Memory.set_up().await;
1217
1218        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1219            WorkerResult::DbUpdatedByWorker,
1220        ));
1221        let parent_execution_id = ExecutionId::generate();
1222        db_pool
1223            .connection()
1224            .create(CreateRequest {
1225                created_at: sim_clock.now(),
1226                execution_id: parent_execution_id.clone(),
1227                ffqn: FFQN_SOME,
1228                params: Params::empty(),
1229                parent: None,
1230                metadata: concepts::ExecutionMetadata::empty(),
1231                scheduled_at: sim_clock.now(),
1232                retry_exp_backoff: Duration::ZERO,
1233                max_retries: 0,
1234                component_id: ComponentId::dummy_activity(),
1235                scheduled_by: None,
1236            })
1237            .await
1238            .unwrap();
1239        tick_fn(
1240            ExecConfig {
1241                batch_size: 1,
1242                lock_expiry: LOCK_EXPIRY,
1243                tick_sleep: Duration::ZERO,
1244                component_id: ComponentId::dummy_activity(),
1245                task_limiter: None,
1246            },
1247            sim_clock.clone(),
1248            db_pool.clone(),
1249            parent_worker,
1250            sim_clock.now(),
1251        )
1252        .await;
1253
1254        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1255        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1256        // executor does not append anything, this should have been written by the worker:
1257        {
1258            let child = CreateRequest {
1259                created_at: sim_clock.now(),
1260                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1261                ffqn: FFQN_CHILD,
1262                params: Params::empty(),
1263                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1264                metadata: concepts::ExecutionMetadata::empty(),
1265                scheduled_at: sim_clock.now(),
1266                retry_exp_backoff: Duration::ZERO,
1267                max_retries: 0,
1268                component_id: ComponentId::dummy_activity(),
1269                scheduled_by: None,
1270            };
1271            let current_time = sim_clock.now();
1272            let join_set = AppendRequest {
1273                created_at: current_time,
1274                event: ExecutionEventInner::HistoryEvent {
1275                    event: HistoryEvent::JoinSetCreate {
1276                        join_set_id: join_set_id.clone(),
1277                        closing_strategy: ClosingStrategy::Complete,
1278                    },
1279                },
1280            };
1281            let child_exec_req = AppendRequest {
1282                created_at: current_time,
1283                event: ExecutionEventInner::HistoryEvent {
1284                    event: HistoryEvent::JoinSetRequest {
1285                        join_set_id: join_set_id.clone(),
1286                        request: JoinSetRequest::ChildExecutionRequest {
1287                            child_execution_id: child_execution_id.clone(),
1288                        },
1289                    },
1290                },
1291            };
1292            let join_next = AppendRequest {
1293                created_at: current_time,
1294                event: ExecutionEventInner::HistoryEvent {
1295                    event: HistoryEvent::JoinNext {
1296                        join_set_id: join_set_id.clone(),
1297                        run_expires_at: sim_clock.now(),
1298                        closing: false,
1299                    },
1300                },
1301            };
1302            db_pool
1303                .connection()
1304                .append_batch_create_new_execution(
1305                    current_time,
1306                    vec![join_set, child_exec_req, join_next],
1307                    parent_execution_id.clone(),
1308                    Version::new(2),
1309                    vec![child],
1310                )
1311                .await
1312                .unwrap();
1313        }
1314
1315        let child_worker = Arc::new(
1316            SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1317        );
1318
1319        // execute the child
1320        tick_fn(
1321            ExecConfig {
1322                batch_size: 1,
1323                lock_expiry: LOCK_EXPIRY,
1324                tick_sleep: Duration::ZERO,
1325                component_id: ComponentId::dummy_activity(),
1326                task_limiter: None,
1327            },
1328            sim_clock.clone(),
1329            db_pool.clone(),
1330            child_worker,
1331            sim_clock.now(),
1332        )
1333        .await;
1334        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1335            // In case of timeout, let the timers watcher handle it
1336            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1337            expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1338                .await
1339                .unwrap();
1340        }
1341        let child_log = db_pool
1342            .connection()
1343            .get(&ExecutionId::Derived(child_execution_id.clone()))
1344            .await
1345            .unwrap();
1346        assert!(child_log.pending_state.is_finished());
1347        assert_eq!(
1348            Version(2),
1349            child_log.next_version,
1350            "created = 0, locked = 1, with_single_result = 2"
1351        );
1352        assert_eq!(
1353            ExecutionEventInner::Finished {
1354                result: Err(expected_child_err),
1355                http_client_traces: None
1356            },
1357            child_log.last_event().event
1358        );
1359        let parent_log = db_pool
1360            .connection()
1361            .get(&parent_execution_id)
1362            .await
1363            .unwrap();
1364        assert_matches!(
1365            parent_log.pending_state,
1366            PendingState::PendingAt {
1367                scheduled_at
1368            } if scheduled_at == sim_clock.now(),
1369            "parent should be back to pending"
1370        );
1371        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1372            parent_log.responses.last(),
1373            Some(JoinSetResponseEventOuter{
1374                created_at: at,
1375                event: JoinSetResponseEvent{
1376                    join_set_id: found_join_set_id,
1377                    event: JoinSetResponse::ChildExecutionFinished {
1378                        child_execution_id: found_child_execution_id,
1379                        finished_version,
1380                        result: found_result,
1381                    }
1382                }
1383            })
1384             if *at == sim_clock.now()
1385            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1386        );
1387        assert_eq!(join_set_id, *found_join_set_id);
1388        assert_eq!(child_execution_id, *found_child_execution_id);
1389        assert_eq!(child_log.next_version, *child_finished_version);
1390        assert!(found_result.is_err());
1391
1392        db_pool.close().await.unwrap();
1393    }
1394
1395    #[derive(Clone, Debug)]
1396    struct SleepyWorker {
1397        duration: Duration,
1398        result: SupportedFunctionReturnValue,
1399        exported: [FunctionMetadata; 1],
1400    }
1401
1402    #[async_trait]
1403    impl Worker for SleepyWorker {
1404        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1405            tokio::time::sleep(self.duration).await;
1406            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1407        }
1408
1409        fn exported_functions(&self) -> &[FunctionMetadata] {
1410            &self.exported
1411        }
1412
1413        fn imported_functions(&self) -> &[FunctionMetadata] {
1414            &[]
1415        }
1416    }
1417
1418    #[expect(clippy::too_many_lines)]
1419    #[tokio::test]
1420    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1421        set_up();
1422        let sim_clock = SimClock::default();
1423        let (_guard, db_pool) = Database::Memory.set_up().await;
1424        let lock_expiry = Duration::from_millis(100);
1425        let exec_config = ExecConfig {
1426            batch_size: 1,
1427            lock_expiry,
1428            tick_sleep: Duration::ZERO,
1429            component_id: ComponentId::dummy_activity(),
1430            task_limiter: None,
1431        };
1432
1433        let worker = Arc::new(SleepyWorker {
1434            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1435            result: SupportedFunctionReturnValue::None,
1436            exported: [FunctionMetadata {
1437                ffqn: FFQN_SOME,
1438                parameter_types: ParameterTypes::default(),
1439                return_type: None,
1440                extension: None,
1441                submittable: true,
1442            }],
1443        });
1444        // Create an execution
1445        let execution_id = ExecutionId::generate();
1446        let timeout_duration = Duration::from_millis(300);
1447        let db_connection = db_pool.connection();
1448        db_connection
1449            .create(CreateRequest {
1450                created_at: sim_clock.now(),
1451                execution_id: execution_id.clone(),
1452                ffqn: FFQN_SOME,
1453                params: Params::empty(),
1454                parent: None,
1455                metadata: concepts::ExecutionMetadata::empty(),
1456                scheduled_at: sim_clock.now(),
1457                retry_exp_backoff: timeout_duration,
1458                max_retries: 1,
1459                component_id: ComponentId::dummy_activity(),
1460                scheduled_by: None,
1461            })
1462            .await
1463            .unwrap();
1464
1465        let ffqns = super::extract_ffqns(worker.as_ref());
1466        let executor = ExecTask::new(
1467            worker,
1468            exec_config,
1469            sim_clock.clone(),
1470            db_pool.clone(),
1471            ffqns,
1472        );
1473        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1474        assert_eq!(1, first_execution_progress.executions.len());
1475        // Started hanging, wait for lock expiry.
1476        sim_clock.move_time_forward(lock_expiry).await;
1477        // cleanup should be called
1478        let now_after_first_lock_expiry = sim_clock.now();
1479        {
1480            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1481            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1482            assert!(cleanup_progress.executions.is_empty());
1483        }
1484        {
1485            let expired_locks =
1486                expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1487                    .await
1488                    .unwrap()
1489                    .expired_locks;
1490            assert_eq!(1, expired_locks);
1491        }
1492        assert!(
1493            !first_execution_progress
1494                .executions
1495                .pop()
1496                .unwrap()
1497                .1
1498                .is_finished()
1499        );
1500
1501        let execution_log = db_connection.get(&execution_id).await.unwrap();
1502        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1503        assert_matches!(
1504            &execution_log.events.get(2).unwrap(),
1505            ExecutionEvent {
1506                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1507                created_at: at,
1508                backtrace_id: None,
1509            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1510        );
1511        assert_eq!(
1512            PendingState::PendingAt {
1513                scheduled_at: expected_first_timeout_expiry
1514            },
1515            execution_log.pending_state
1516        );
1517        sim_clock.move_time_forward(timeout_duration).await;
1518        let now_after_first_timeout = sim_clock.now();
1519        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1520
1521        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1522        assert_eq!(1, second_execution_progress.executions.len());
1523
1524        // Started hanging, wait for lock expiry.
1525        sim_clock.move_time_forward(lock_expiry).await;
1526        // cleanup should be called
1527        let now_after_second_lock_expiry = sim_clock.now();
1528        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1529        {
1530            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1531            assert!(cleanup_progress.executions.is_empty());
1532        }
1533        {
1534            let expired_locks =
1535                expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1536                    .await
1537                    .unwrap()
1538                    .expired_locks;
1539            assert_eq!(1, expired_locks);
1540        }
1541        assert!(
1542            !second_execution_progress
1543                .executions
1544                .pop()
1545                .unwrap()
1546                .1
1547                .is_finished()
1548        );
1549
1550        drop(db_connection);
1551        drop(executor);
1552        db_pool.close().await.unwrap();
1553    }
1554}