obeli_sk_executor/
executor.rs

1use crate::worker::{FatalError, Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11    ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16    FinishedExecutionError,
17    storage::{ExecutionEventInner, Version},
18};
19use std::{
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31    pub lock_expiry: Duration,
32    pub tick_sleep: Duration,
33    pub batch_size: u32,
34    pub component_id: ComponentId,
35    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36    pub executor_id: ExecutorId,
37    pub retry_config: ComponentRetryConfig,
38}
39
40pub struct ExecTask<C: ClockFn> {
41    worker: Arc<dyn Worker>,
42    config: ExecConfig,
43    clock_fn: C, // Used for obtaining current time when the execution finishes.
44    db_exec: Arc<dyn DbExecutor>,
45    ffqns: Arc<[FunctionFqn]>,
46}
47
48#[derive(derive_more::Debug, Default)]
49pub struct ExecutionProgress {
50    #[debug(skip)]
51    #[allow(dead_code)]
52    executions: Vec<(ExecutionId, JoinHandle<()>)>,
53}
54
55impl ExecutionProgress {
56    #[cfg(feature = "test")]
57    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
58        let mut vec = Vec::new();
59        for (exe, join_handle) in self.executions {
60            vec.push(exe);
61            join_handle.await.unwrap();
62        }
63        vec
64    }
65}
66
67#[derive(derive_more::Debug)]
68pub struct ExecutorTaskHandle {
69    #[debug(skip)]
70    is_closing: Arc<AtomicBool>,
71    #[debug(skip)]
72    abort_handle: AbortHandle,
73    component_id: ComponentId,
74    executor_id: ExecutorId,
75}
76
77impl ExecutorTaskHandle {
78    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
79    pub async fn close(&self) {
80        trace!("Gracefully closing");
81        // TODO: Close all the workers tasks as well.
82        // Simple solution:
83        // Attempt to unlock all in-progress executions. Activities will not be penalized for shutdown, although the number of retries would increase,
84        // some progress will be lost.
85        // More complex solution:
86        // Activities will be awaited, up to some deadline, then unlocked.
87        // Workflows that are awating should be signaled using `DeadlineTracker`.
88        self.is_closing.store(true, Ordering::Relaxed);
89        while !self.abort_handle.is_finished() {
90            tokio::time::sleep(Duration::from_millis(1)).await;
91        }
92        debug!("Gracefully closed");
93    }
94}
95
96impl Drop for ExecutorTaskHandle {
97    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
98    fn drop(&mut self) {
99        if self.abort_handle.is_finished() {
100            return;
101        }
102        warn!("Aborting the executor task");
103        self.abort_handle.abort();
104    }
105}
106
107#[cfg(feature = "test")]
108pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
109    extract_exported_ffqns_noext(worker)
110}
111
112fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
113    worker
114        .exported_functions()
115        .iter()
116        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
117        .collect::<Arc<_>>()
118}
119
120impl<C: ClockFn + 'static> ExecTask<C> {
121    #[cfg(feature = "test")]
122    pub fn new_test(
123        worker: Arc<dyn Worker>,
124        config: ExecConfig,
125        clock_fn: C,
126        db_exec: Arc<dyn DbExecutor>,
127        ffqns: Arc<[FunctionFqn]>,
128    ) -> Self {
129        Self {
130            worker,
131            config,
132            clock_fn,
133            db_exec,
134            ffqns,
135        }
136    }
137
138    #[cfg(feature = "test")]
139    pub fn new_all_ffqns_test(
140        worker: Arc<dyn Worker>,
141        config: ExecConfig,
142        clock_fn: C,
143        db_exec: Arc<dyn DbExecutor>,
144    ) -> Self {
145        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
146        Self {
147            worker,
148            config,
149            clock_fn,
150            db_exec,
151            ffqns,
152        }
153    }
154
155    pub fn spawn_new(
156        worker: Arc<dyn Worker>,
157        config: ExecConfig,
158        clock_fn: C,
159        db_exec: Arc<dyn DbExecutor>,
160        sleep: impl Sleep + 'static,
161    ) -> ExecutorTaskHandle {
162        let is_closing = Arc::new(AtomicBool::default());
163        let is_closing_inner = is_closing.clone();
164        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
165        let component_id = config.component_id.clone();
166        let executor_id = config.executor_id;
167        let abort_handle = tokio::spawn(async move {
168            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
169            let task = ExecTask {
170                worker,
171                config,
172                db_exec,
173                ffqns: ffqns.clone(),
174                clock_fn: clock_fn.clone(),
175            };
176            while !is_closing_inner.load(Ordering::Relaxed) {
177                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
178
179                task.db_exec
180                    .wait_for_pending(clock_fn.now(), ffqns.clone(), {
181                        let sleep = sleep.clone();
182                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
183                    .await;
184            }
185        })
186        .abort_handle();
187        ExecutorTaskHandle {
188            is_closing,
189            abort_handle,
190            component_id,
191            executor_id,
192        }
193    }
194
195    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
196        if let Some(task_limiter) = &self.config.task_limiter {
197            let mut locks = Vec::new();
198            for _ in 0..self.config.batch_size {
199                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
200                    locks.push(Some(permit));
201                } else {
202                    break;
203                }
204            }
205            locks
206        } else {
207            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
208            for _ in 0..self.config.batch_size {
209                vec.push(None);
210            }
211            vec
212        }
213    }
214
215    #[cfg(feature = "test")]
216    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
217        self.tick(executed_at, run_id).await.unwrap()
218    }
219
220    #[cfg(feature = "test")]
221    pub async fn tick_test_await(
222        &self,
223        executed_at: DateTime<Utc>,
224        run_id: RunId,
225    ) -> Vec<ExecutionId> {
226        self.tick(executed_at, run_id)
227            .await
228            .unwrap()
229            .wait_for_tasks()
230            .await
231    }
232
233    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
234    async fn tick(
235        &self,
236        executed_at: DateTime<Utc>,
237        run_id: RunId,
238    ) -> Result<ExecutionProgress, DbErrorGeneric> {
239        let locked_executions = {
240            let mut permits = self.acquire_task_permits();
241            if permits.is_empty() {
242                return Ok(ExecutionProgress::default());
243            }
244            let lock_expires_at = executed_at + self.config.lock_expiry;
245            let locked_executions = self
246                .db_exec
247                .lock_pending(
248                    permits.len(), // batch size
249                    executed_at,   // fetch expiring before now
250                    self.ffqns.clone(),
251                    executed_at, // created at
252                    self.config.component_id.clone(),
253                    self.config.executor_id,
254                    lock_expires_at,
255                    run_id,
256                    self.config.retry_config,
257                )
258                .await?;
259            // Drop permits if too many were allocated.
260            while permits.len() > locked_executions.len() {
261                permits.pop();
262            }
263            assert_eq!(permits.len(), locked_executions.len());
264            locked_executions.into_iter().zip(permits)
265        };
266
267        let mut executions = Vec::with_capacity(locked_executions.len());
268        for (locked_execution, permit) in locked_executions {
269            let execution_id = locked_execution.execution_id.clone();
270            let join_handle = {
271                let worker = self.worker.clone();
272                let db = self.db_exec.clone();
273                let clock_fn = self.clock_fn.clone();
274                let worker_span = info_span!(parent: None, "worker",
275                    "otel.name" = format!("worker {}", locked_execution.ffqn),
276                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
277                locked_execution.metadata.enrich(&worker_span);
278                tokio::spawn({
279                    let worker_span2 = worker_span.clone();
280                    let retry_config = self.config.retry_config;
281                    async move {
282                        let _permit = permit;
283                        let res = Self::run_worker(
284                            worker,
285                            db.as_ref(),
286                            clock_fn,
287                            locked_execution,
288                            retry_config,
289                            worker_span2,
290                        )
291                        .await;
292                        if let Err(db_error) = res {
293                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
294                        }
295                    }
296                    .instrument(worker_span)
297                })
298            };
299            executions.push((execution_id, join_handle));
300        }
301        Ok(ExecutionProgress { executions })
302    }
303
304    async fn run_worker(
305        worker: Arc<dyn Worker>,
306        db_exec: &dyn DbExecutor,
307        clock_fn: C,
308        locked_execution: LockedExecution,
309        retry_config: ComponentRetryConfig,
310        worker_span: Span,
311    ) -> Result<(), DbErrorWrite> {
312        debug!("Worker::run starting");
313        trace!(
314            version = %locked_execution.next_version,
315            params = ?locked_execution.params,
316            event_history = ?locked_execution.event_history,
317            "Worker::run starting"
318        );
319        let can_be_retried = ExecutionLog::can_be_retried_after(
320            locked_execution.intermittent_event_count + 1,
321            retry_config.max_retries,
322            retry_config.retry_exp_backoff,
323        );
324        let unlock_expiry_on_limit_reached =
325            ExecutionLog::compute_retry_duration_when_retrying_forever(
326                locked_execution.intermittent_event_count + 1,
327                retry_config.retry_exp_backoff,
328            );
329        let ctx = WorkerContext {
330            execution_id: locked_execution.execution_id.clone(),
331            metadata: locked_execution.metadata,
332            ffqn: locked_execution.ffqn,
333            params: locked_execution.params,
334            event_history: locked_execution.event_history,
335            responses: locked_execution
336                .responses
337                .into_iter()
338                .map(|outer| outer.event)
339                .collect(),
340            version: locked_execution.next_version,
341            can_be_retried: can_be_retried.is_some(),
342            locked_event: locked_execution.locked_event,
343            worker_span,
344        };
345        let worker_result = worker.run(ctx).await;
346        trace!(?worker_result, "Worker::run finished");
347        let result_obtained_at = clock_fn.now();
348        match Self::worker_result_to_execution_event(
349            locked_execution.execution_id,
350            worker_result,
351            result_obtained_at,
352            locked_execution.parent,
353            can_be_retried,
354            unlock_expiry_on_limit_reached,
355        )? {
356            Some(append) => {
357                trace!("Appending {append:?}");
358                match append {
359                    AppendOrCancel::Cancel {
360                        execution_id,
361                        cancelled_at,
362                    } => db_exec
363                        .cancel_activity_with_retries(&execution_id, cancelled_at)
364                        .await
365                        .map(|_| ()),
366                    AppendOrCancel::Other(append) => append.append(db_exec).await,
367                }
368            }
369            None => Ok(()),
370        }
371    }
372
373    /// Map the `WorkerError` to an temporary or a permanent failure.
374    fn worker_result_to_execution_event(
375        execution_id: ExecutionId,
376        worker_result: WorkerResult,
377        result_obtained_at: DateTime<Utc>,
378        parent: Option<(ExecutionId, JoinSetId)>,
379        can_be_retried: Option<Duration>,
380        unlock_expiry_on_limit_reached: Duration,
381    ) -> Result<Option<AppendOrCancel>, DbErrorWrite> {
382        Ok(match worker_result {
383            WorkerResult::Ok(result, new_version, http_client_traces) => {
384                info!("Execution finished: {result}");
385                let child_finished =
386                    parent.map(
387                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
388                            parent_execution_id,
389                            parent_join_set,
390                            result: result.clone(),
391                        },
392                    );
393                let primary_event = AppendRequest {
394                    created_at: result_obtained_at,
395                    event: ExecutionEventInner::Finished {
396                        result,
397                        http_client_traces,
398                    },
399                };
400
401                Some(AppendOrCancel::Other(Append {
402                    created_at: result_obtained_at,
403                    primary_event,
404                    execution_id,
405                    version: new_version,
406                    child_finished,
407                }))
408            }
409            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
410            WorkerResult::Err(err) => {
411                let reason_generic = err.to_string(); // Override with err's reason if no information is lost.
412
413                let (primary_event, child_finished, version) = match err {
414                    WorkerError::TemporaryTimeout {
415                        http_client_traces,
416                        version,
417                    } => {
418                        if let Some(duration) = can_be_retried {
419                            let backoff_expires_at = result_obtained_at + duration;
420                            info!(
421                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
422                            );
423                            (
424                                ExecutionEventInner::TemporarilyTimedOut {
425                                    backoff_expires_at,
426                                    http_client_traces,
427                                },
428                                None,
429                                version,
430                            )
431                        } else {
432                            info!("Execution timed out");
433                            let result = SupportedFunctionReturnValue::ExecutionError(
434                                FinishedExecutionError {
435                                    kind: ExecutionFailureKind::TimedOut,
436                                    reason: None,
437                                    detail: None,
438                                },
439                            );
440                            let child_finished =
441                                parent.map(|(parent_execution_id, parent_join_set)| {
442                                    ChildFinishedResponse {
443                                        parent_execution_id,
444                                        parent_join_set,
445                                        result: result.clone(),
446                                    }
447                                });
448                            (
449                                ExecutionEventInner::Finished {
450                                    result,
451                                    http_client_traces,
452                                },
453                                child_finished,
454                                version,
455                            )
456                        }
457                    }
458                    WorkerError::DbError(db_error) => {
459                        return Err(db_error);
460                    }
461                    WorkerError::ActivityTrap {
462                        reason: _, // reason_generic contains trap_kind + reason
463                        trap_kind,
464                        detail,
465                        version,
466                        http_client_traces,
467                    } => {
468                        if let Some(duration) = can_be_retried {
469                            let expires_at = result_obtained_at + duration;
470                            debug!(
471                                "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
472                            );
473                            (
474                                ExecutionEventInner::TemporarilyFailed {
475                                    reason: StrVariant::from(reason_generic),
476                                    backoff_expires_at: expires_at,
477                                    detail,
478                                    http_client_traces,
479                                },
480                                None,
481                                version,
482                            )
483                        } else {
484                            info!(
485                                "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
486                            );
487                            let result = SupportedFunctionReturnValue::ExecutionError(
488                                FinishedExecutionError {
489                                    reason: Some(reason_generic),
490                                    kind: ExecutionFailureKind::Uncategorized,
491                                    detail,
492                                },
493                            );
494                            let child_finished =
495                                parent.map(|(parent_execution_id, parent_join_set)| {
496                                    ChildFinishedResponse {
497                                        parent_execution_id,
498                                        parent_join_set,
499                                        result: result.clone(),
500                                    }
501                                });
502                            (
503                                ExecutionEventInner::Finished {
504                                    result,
505                                    http_client_traces,
506                                },
507                                child_finished,
508                                version,
509                            )
510                        }
511                    }
512                    WorkerError::ActivityPreopenedDirError {
513                        reason,
514                        detail,
515                        version,
516                    } => {
517                        let http_client_traces = None;
518                        if let Some(duration) = can_be_retried {
519                            let expires_at = result_obtained_at + duration;
520                            debug!(
521                                "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
522                            );
523                            (
524                                ExecutionEventInner::TemporarilyFailed {
525                                    reason: StrVariant::from(reason_generic),
526                                    backoff_expires_at: expires_at,
527                                    detail: Some(detail),
528                                    http_client_traces,
529                                },
530                                None,
531                                version,
532                            )
533                        } else {
534                            info!(
535                                "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
536                            );
537                            let result = SupportedFunctionReturnValue::ExecutionError(
538                                FinishedExecutionError {
539                                    reason: Some(reason_generic),
540                                    kind: ExecutionFailureKind::Uncategorized,
541                                    detail: Some(detail),
542                                },
543                            );
544                            let child_finished =
545                                parent.map(|(parent_execution_id, parent_join_set)| {
546                                    ChildFinishedResponse {
547                                        parent_execution_id,
548                                        parent_join_set,
549                                        result: result.clone(),
550                                    }
551                                });
552                            (
553                                ExecutionEventInner::Finished {
554                                    result,
555                                    http_client_traces,
556                                },
557                                child_finished,
558                                version,
559                            )
560                        }
561                    }
562                    WorkerError::ActivityReturnedError {
563                        detail,
564                        version,
565                        http_client_traces,
566                    } => {
567                        let duration = can_be_retried.expect(
568                            "ActivityReturnedError must not be returned when retries are exhausted",
569                        );
570                        let expires_at = result_obtained_at + duration;
571                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
572                        (
573                            ExecutionEventInner::TemporarilyFailed {
574                                backoff_expires_at: expires_at,
575                                reason: StrVariant::Static("activity returned error"), // is same as the variant's display message.
576                                detail, // contains the backtrace
577                                http_client_traces,
578                            },
579                            None,
580                            version,
581                        )
582                    }
583                    WorkerError::LimitReached {
584                        reason,
585                        version: new_version,
586                    } => {
587                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
588                        warn!(
589                            "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
590                        );
591                        (
592                            ExecutionEventInner::Unlocked {
593                                backoff_expires_at: expires_at,
594                                reason: StrVariant::from(reason),
595                            },
596                            None,
597                            new_version,
598                        )
599                    }
600                    WorkerError::FatalError(FatalError::Cancelled, _version) => {
601                        // use more optimized `cancel` to avoid polluting the logs with errors on a race.
602                        return Ok(Some(AppendOrCancel::Cancel {
603                            execution_id,
604                            cancelled_at: result_obtained_at,
605                        }));
606                    }
607                    WorkerError::FatalError(fatal_error, version) => {
608                        warn!("Fatal worker error - {fatal_error:?}");
609                        let result = SupportedFunctionReturnValue::ExecutionError(
610                            FinishedExecutionError::from(fatal_error),
611                        );
612                        let child_finished =
613                            parent.map(|(parent_execution_id, parent_join_set)| {
614                                ChildFinishedResponse {
615                                    parent_execution_id,
616                                    parent_join_set,
617                                    result: result.clone(),
618                                }
619                            });
620                        (
621                            ExecutionEventInner::Finished {
622                                result,
623                                http_client_traces: None,
624                            },
625                            child_finished,
626                            version,
627                        )
628                    }
629                };
630                Some(AppendOrCancel::Other(Append {
631                    created_at: result_obtained_at,
632                    primary_event: AppendRequest {
633                        created_at: result_obtained_at,
634                        event: primary_event,
635                    },
636                    execution_id,
637                    version,
638                    child_finished,
639                }))
640            }
641        })
642    }
643}
644
645#[derive(Debug, Clone)]
646pub(crate) struct ChildFinishedResponse {
647    pub(crate) parent_execution_id: ExecutionId,
648    pub(crate) parent_join_set: JoinSetId,
649    pub(crate) result: SupportedFunctionReturnValue,
650}
651
652#[derive(Debug, Clone)]
653#[expect(clippy::large_enum_variant)]
654pub(crate) enum AppendOrCancel {
655    Cancel {
656        execution_id: ExecutionId,
657        cancelled_at: DateTime<Utc>,
658    },
659    Other(Append),
660}
661
662#[derive(Debug, Clone)]
663pub(crate) struct Append {
664    pub(crate) created_at: DateTime<Utc>,
665    pub(crate) primary_event: AppendRequest,
666    pub(crate) execution_id: ExecutionId,
667    pub(crate) version: Version,
668    pub(crate) child_finished: Option<ChildFinishedResponse>,
669}
670
671impl Append {
672    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
673        if let Some(child_finished) = self.child_finished {
674            assert_matches!(
675                &self.primary_event,
676                AppendRequest {
677                    event: ExecutionEventInner::Finished { .. },
678                    ..
679                }
680            );
681            let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
682            let events = AppendEventsToExecution {
683                execution_id: self.execution_id,
684                version: self.version.clone(),
685                batch: vec![self.primary_event],
686            };
687            let response = AppendResponseToExecution {
688                parent_execution_id: child_finished.parent_execution_id,
689                created_at: self.created_at,
690                join_set_id: child_finished.parent_join_set,
691                child_execution_id,
692                finished_version: self.version, // Since self.primary_event is a finished event, the version will remain the same.
693                result: child_finished.result,
694            };
695
696            db_exec
697                .append_batch_respond_to_parent(events, response, self.created_at)
698                .await?;
699        } else {
700            db_exec
701                .append(self.execution_id, self.version, self.primary_event)
702                .await?;
703        }
704        Ok(())
705    }
706}
707
708#[cfg(any(test, feature = "test"))]
709pub mod simple_worker {
710    use crate::worker::{Worker, WorkerContext, WorkerResult};
711    use async_trait::async_trait;
712    use concepts::{
713        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
714        storage::{HistoryEvent, Version},
715    };
716    use indexmap::IndexMap;
717    use std::sync::Arc;
718    use tracing::trace;
719
720    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
721    pub type SimpleWorkerResultMap =
722        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
723
724    #[derive(Clone, Debug)]
725    pub struct SimpleWorker {
726        pub worker_results_rev: SimpleWorkerResultMap,
727        pub ffqn: FunctionFqn,
728        exported: [FunctionMetadata; 1],
729    }
730
731    impl SimpleWorker {
732        #[must_use]
733        pub fn with_single_result(res: WorkerResult) -> Self {
734            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
735                Version::new(2),
736                (vec![], res),
737            )]))))
738        }
739
740        #[must_use]
741        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
742            Self {
743                worker_results_rev: self.worker_results_rev,
744                exported: [FunctionMetadata {
745                    ffqn: ffqn.clone(),
746                    parameter_types: ParameterTypes::default(),
747                    return_type: RETURN_TYPE_DUMMY,
748                    extension: None,
749                    submittable: true,
750                }],
751                ffqn,
752            }
753        }
754
755        #[must_use]
756        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
757            Self {
758                worker_results_rev,
759                ffqn: FFQN_SOME,
760                exported: [FunctionMetadata {
761                    ffqn: FFQN_SOME,
762                    parameter_types: ParameterTypes::default(),
763                    return_type: RETURN_TYPE_DUMMY,
764                    extension: None,
765                    submittable: true,
766                }],
767            }
768        }
769    }
770
771    #[async_trait]
772    impl Worker for SimpleWorker {
773        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
774            let (expected_version, (expected_eh, worker_result)) =
775                self.worker_results_rev.lock().unwrap().pop().unwrap();
776            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
777            assert_eq!(expected_version, ctx.version);
778            assert_eq!(
779                expected_eh,
780                ctx.event_history
781                    .iter()
782                    .map(|(event, _version)| event.clone())
783                    .collect::<Vec<_>>()
784            );
785            worker_result
786        }
787
788        fn exported_functions(&self) -> &[FunctionMetadata] {
789            &self.exported
790        }
791    }
792}
793
794#[cfg(test)]
795mod tests {
796    use self::simple_worker::SimpleWorker;
797    use super::*;
798    use crate::{expired_timers_watcher, worker::WorkerResult};
799    use assert_matches::assert_matches;
800    use async_trait::async_trait;
801    use concepts::storage::{
802        CreateRequest, DbConnection, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
803    };
804    use concepts::storage::{DbPoolCloseable, LockedBy};
805    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
806    use concepts::time::Now;
807    use concepts::{
808        FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
809        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
810    };
811    use db_tests::Database;
812    use indexmap::IndexMap;
813    use simple_worker::FFQN_SOME;
814    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
815    use test_utils::set_up;
816    use test_utils::sim_clock::{ConstClock, SimClock};
817
818    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
819
820    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
821        config: ExecConfig,
822        clock_fn: C,
823        db_exec: Arc<dyn DbExecutor>,
824        worker: Arc<W>,
825        executed_at: DateTime<Utc>,
826    ) -> Vec<ExecutionId> {
827        trace!("Ticking with {worker:?}");
828        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
829        let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
830        executor
831            .tick_test_await(executed_at, RunId::generate())
832            .await
833    }
834
835    #[tokio::test]
836    async fn execute_simple_lifecycle_tick_based_mem() {
837        let created_at = Now.now();
838        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
839        execute_simple_lifecycle_tick_based(
840            db_pool.connection().as_ref(),
841            db_exec.clone(),
842            ConstClock(created_at),
843        )
844        .await;
845        db_close.close().await;
846    }
847
848    #[tokio::test]
849    async fn execute_simple_lifecycle_tick_based_sqlite() {
850        let created_at = Now.now();
851        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
852        execute_simple_lifecycle_tick_based(
853            db_pool.connection().as_ref(),
854            db_exec.clone(),
855            ConstClock(created_at),
856        )
857        .await;
858        db_close.close().await;
859    }
860
861    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
862        db_connection: &dyn DbConnection,
863        db_exec: Arc<dyn DbExecutor>,
864        clock_fn: C,
865    ) {
866        set_up();
867        let created_at = clock_fn.now();
868        let exec_config = ExecConfig {
869            batch_size: 1,
870            lock_expiry: Duration::from_secs(1),
871            tick_sleep: Duration::from_millis(100),
872            component_id: ComponentId::dummy_activity(),
873            task_limiter: None,
874            executor_id: ExecutorId::generate(),
875            retry_config: ComponentRetryConfig::ZERO,
876        };
877
878        let execution_log = create_and_tick(
879            CreateAndTickConfig {
880                execution_id: ExecutionId::generate(),
881                created_at,
882                executed_at: created_at,
883            },
884            clock_fn,
885            db_connection,
886            db_exec,
887            exec_config,
888            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
889                SUPPORTED_RETURN_VALUE_OK_EMPTY,
890                Version::new(2),
891                None,
892            ))),
893            tick_fn,
894        )
895        .await;
896        assert_matches!(
897            execution_log.events.get(2).unwrap(),
898            ExecutionEvent {
899                event: ExecutionEventInner::Finished {
900                    result: SupportedFunctionReturnValue::Ok { ok: None },
901                    http_client_traces: None
902                },
903                created_at: _,
904                backtrace_id: None,
905                version: Version(2),
906            }
907        );
908    }
909
910    #[tokio::test]
911    async fn execute_simple_lifecycle_task_based_mem() {
912        set_up();
913        let created_at = Now.now();
914        let clock_fn = ConstClock(created_at);
915        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
916        let exec_config = ExecConfig {
917            batch_size: 1,
918            lock_expiry: Duration::from_secs(1),
919            tick_sleep: Duration::ZERO,
920            component_id: ComponentId::dummy_activity(),
921            task_limiter: None,
922            executor_id: ExecutorId::generate(),
923            retry_config: ComponentRetryConfig::ZERO,
924        };
925
926        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
927            SUPPORTED_RETURN_VALUE_OK_EMPTY,
928            Version::new(2),
929            None,
930        )));
931
932        let execution_log = create_and_tick(
933            CreateAndTickConfig {
934                execution_id: ExecutionId::generate(),
935                created_at,
936                executed_at: created_at,
937            },
938            clock_fn,
939            db_pool.connection().as_ref(),
940            db_exec,
941            exec_config,
942            worker,
943            tick_fn,
944        )
945        .await;
946        assert_matches!(
947            execution_log.events.get(2).unwrap(),
948            ExecutionEvent {
949                event: ExecutionEventInner::Finished {
950                    result: SupportedFunctionReturnValue::Ok { ok: None },
951                    http_client_traces: None
952                },
953                created_at: _,
954                backtrace_id: None,
955                version: Version(2),
956            }
957        );
958        db_close.close().await;
959    }
960
961    struct CreateAndTickConfig {
962        execution_id: ExecutionId,
963        created_at: DateTime<Utc>,
964        executed_at: DateTime<Utc>,
965    }
966
967    async fn create_and_tick<
968        W: Worker,
969        C: ClockFn,
970        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
971        F: Future<Output = Vec<ExecutionId>>,
972    >(
973        config: CreateAndTickConfig,
974        clock_fn: C,
975        db_connection: &dyn DbConnection,
976        db_exec: Arc<dyn DbExecutor>,
977        exec_config: ExecConfig,
978        worker: Arc<W>,
979        mut tick: T,
980    ) -> ExecutionLog {
981        // Create an execution
982        db_connection
983            .create(CreateRequest {
984                created_at: config.created_at,
985                execution_id: config.execution_id.clone(),
986                ffqn: FFQN_SOME,
987                params: Params::empty(),
988                parent: None,
989                metadata: concepts::ExecutionMetadata::empty(),
990                scheduled_at: config.created_at,
991                component_id: ComponentId::dummy_activity(),
992                scheduled_by: None,
993            })
994            .await
995            .unwrap();
996        // execute!
997        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
998        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
999        debug!("Execution history after tick: {execution_log:?}");
1000        // check that DB contains Created and Locked events.
1001        assert_matches!(
1002            execution_log.events.first().unwrap(),
1003            ExecutionEvent {
1004                event: ExecutionEventInner::Created { .. },
1005                created_at: actually_created_at,
1006                backtrace_id: None,
1007                version: Version(0),
1008            }
1009            if config.created_at == *actually_created_at
1010        );
1011        let locked_at = assert_matches!(
1012            execution_log.events.get(1).unwrap(),
1013            ExecutionEvent {
1014                event: ExecutionEventInner::Locked { .. },
1015                created_at: locked_at,
1016                backtrace_id: None,
1017                version: Version(1),
1018            } if config.created_at <= *locked_at
1019            => *locked_at
1020        );
1021        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
1022            event: _,
1023            created_at: executed_at,
1024            backtrace_id: None,
1025            version: Version(2),
1026        } if *executed_at >= locked_at);
1027        execution_log
1028    }
1029
1030    #[tokio::test]
1031    async fn activity_trap_should_trigger_an_execution_retry() {
1032        set_up();
1033        let sim_clock = SimClock::default();
1034        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1035        let retry_exp_backoff = Duration::from_millis(100);
1036        let retry_config = ComponentRetryConfig {
1037            max_retries: 1,
1038            retry_exp_backoff,
1039        };
1040        let exec_config = ExecConfig {
1041            batch_size: 1,
1042            lock_expiry: Duration::from_secs(1),
1043            tick_sleep: Duration::ZERO,
1044            component_id: ComponentId::dummy_activity(),
1045            task_limiter: None,
1046            executor_id: ExecutorId::generate(),
1047            retry_config,
1048        };
1049        let expected_reason = "error reason";
1050        let expected_detail = "error detail";
1051        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1052            WorkerError::ActivityTrap {
1053                reason: expected_reason.to_string(),
1054                trap_kind: concepts::TrapKind::Trap,
1055                detail: Some(expected_detail.to_string()),
1056                version: Version::new(2),
1057                http_client_traces: None,
1058            },
1059        )));
1060        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1061        let execution_log = create_and_tick(
1062            CreateAndTickConfig {
1063                execution_id: ExecutionId::generate(),
1064                created_at: sim_clock.now(),
1065                executed_at: sim_clock.now(),
1066            },
1067            sim_clock.clone(),
1068            db_pool.connection().as_ref(),
1069            db_exec.clone(),
1070            exec_config.clone(),
1071            worker,
1072            tick_fn,
1073        )
1074        .await;
1075        assert_eq!(3, execution_log.events.len());
1076        {
1077            let (reason, detail, at, expires_at) = assert_matches!(
1078                &execution_log.events.get(2).unwrap(),
1079                ExecutionEvent {
1080                    event: ExecutionEventInner::TemporarilyFailed {
1081                        reason,
1082                        detail,
1083                        backoff_expires_at,
1084                        http_client_traces: None,
1085                    },
1086                    created_at: at,
1087                    backtrace_id: None,
1088                    version: Version(2),
1089                }
1090                => (reason, detail, *at, *backoff_expires_at)
1091            );
1092            assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1093            assert_eq!(Some(expected_detail), detail.as_deref());
1094            assert_eq!(at, sim_clock.now());
1095            assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1096        }
1097        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1098            std::sync::Mutex::new(IndexMap::from([(
1099                Version::new(4),
1100                (
1101                    vec![],
1102                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1103                ),
1104            )])),
1105        )));
1106        // noop until `retry_exp_backoff` expires
1107        assert!(
1108            tick_fn(
1109                exec_config.clone(),
1110                sim_clock.clone(),
1111                db_exec.clone(),
1112                worker.clone(),
1113                sim_clock.now(),
1114            )
1115            .await
1116            .is_empty()
1117        );
1118        // tick again to finish the execution
1119        sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1120        tick_fn(
1121            exec_config,
1122            sim_clock.clone(),
1123            db_exec.clone(),
1124            worker,
1125            sim_clock.now(),
1126        )
1127        .await;
1128        let execution_log = {
1129            let db_connection = db_pool.connection();
1130            db_connection
1131                .get(&execution_log.execution_id)
1132                .await
1133                .unwrap()
1134        };
1135        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1136        assert_matches!(
1137            execution_log.events.get(3).unwrap(),
1138            ExecutionEvent {
1139                event: ExecutionEventInner::Locked { .. },
1140                created_at: at,
1141                backtrace_id: None,
1142                version: Version(3),
1143            } if *at == sim_clock.now()
1144        );
1145        assert_matches!(
1146            execution_log.events.get(4).unwrap(),
1147            ExecutionEvent {
1148                event: ExecutionEventInner::Finished {
1149                    result: SupportedFunctionReturnValue::Ok{ok:None},
1150                    http_client_traces: None
1151                },
1152                created_at: finished_at,
1153                backtrace_id: None,
1154                version: Version(4),
1155            } if *finished_at == sim_clock.now()
1156        );
1157        db_close.close().await;
1158    }
1159
1160    #[tokio::test]
1161    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1162        set_up();
1163        let created_at = Now.now();
1164        let clock_fn = ConstClock(created_at);
1165        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1166        let exec_config = ExecConfig {
1167            batch_size: 1,
1168            lock_expiry: Duration::from_secs(1),
1169            tick_sleep: Duration::ZERO,
1170            component_id: ComponentId::dummy_activity(),
1171            task_limiter: None,
1172            executor_id: ExecutorId::generate(),
1173            retry_config: ComponentRetryConfig::ZERO,
1174        };
1175
1176        let reason = "error reason";
1177        let expected_reason = format!("activity trap: {reason}");
1178        let expected_detail = "error detail";
1179        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1180            WorkerError::ActivityTrap {
1181                reason: reason.to_string(),
1182                trap_kind: concepts::TrapKind::Trap,
1183                detail: Some(expected_detail.to_string()),
1184                version: Version::new(2),
1185                http_client_traces: None,
1186            },
1187        )));
1188        let execution_log = create_and_tick(
1189            CreateAndTickConfig {
1190                execution_id: ExecutionId::generate(),
1191                created_at,
1192                executed_at: created_at,
1193            },
1194            clock_fn,
1195            db_pool.connection().as_ref(),
1196            db_exec,
1197            exec_config.clone(),
1198            worker,
1199            tick_fn,
1200        )
1201        .await;
1202        assert_eq!(3, execution_log.events.len());
1203        let (reason, kind, detail) = assert_matches!(
1204            &execution_log.events.get(2).unwrap(),
1205            ExecutionEvent {
1206                event: ExecutionEventInner::Finished{
1207                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1208                    http_client_traces: None
1209                },
1210                created_at: at,
1211                backtrace_id: None,
1212                version: Version(2),
1213            } if *at == created_at
1214            => (reason, kind, detail)
1215        );
1216
1217        assert_eq!(Some(expected_reason), *reason);
1218        assert_eq!(Some(expected_detail), detail.as_deref());
1219        assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1220
1221        db_close.close().await;
1222    }
1223
1224    #[tokio::test]
1225    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1226        let worker_error = WorkerError::ActivityTrap {
1227            reason: "error reason".to_string(),
1228            trap_kind: TrapKind::Trap,
1229            detail: Some("detail".to_string()),
1230            version: Version::new(2),
1231            http_client_traces: None,
1232        };
1233        let expected_child_err = FinishedExecutionError {
1234            kind: ExecutionFailureKind::Uncategorized,
1235            reason: Some("activity trap: error reason".to_string()),
1236            detail: Some("detail".to_string()),
1237        };
1238        child_execution_permanently_failed_should_notify_parent(
1239            WorkerResult::Err(worker_error),
1240            expected_child_err,
1241        )
1242        .await;
1243    }
1244
1245    #[tokio::test]
1246    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1247        let expected_child_err = FinishedExecutionError {
1248            kind: ExecutionFailureKind::TimedOut,
1249            reason: None,
1250            detail: None,
1251        };
1252        child_execution_permanently_failed_should_notify_parent(
1253            WorkerResult::DbUpdatedByWorkerOrWatcher,
1254            expected_child_err,
1255        )
1256        .await;
1257    }
1258
1259    async fn child_execution_permanently_failed_should_notify_parent(
1260        worker_result: WorkerResult,
1261        expected_child_err: FinishedExecutionError,
1262    ) {
1263        use concepts::storage::JoinSetResponseEventOuter;
1264        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1265
1266        set_up();
1267        let sim_clock = SimClock::default();
1268        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1269
1270        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1271            WorkerResult::DbUpdatedByWorkerOrWatcher,
1272        ));
1273        let parent_execution_id = ExecutionId::generate();
1274        db_pool
1275            .connection()
1276            .create(CreateRequest {
1277                created_at: sim_clock.now(),
1278                execution_id: parent_execution_id.clone(),
1279                ffqn: FFQN_SOME,
1280                params: Params::empty(),
1281                parent: None,
1282                metadata: concepts::ExecutionMetadata::empty(),
1283                scheduled_at: sim_clock.now(),
1284                component_id: ComponentId::dummy_activity(),
1285                scheduled_by: None,
1286            })
1287            .await
1288            .unwrap();
1289        let parent_executor_id = ExecutorId::generate();
1290        tick_fn(
1291            ExecConfig {
1292                batch_size: 1,
1293                lock_expiry: LOCK_EXPIRY,
1294                tick_sleep: Duration::ZERO,
1295                component_id: ComponentId::dummy_activity(),
1296                task_limiter: None,
1297                executor_id: parent_executor_id,
1298                retry_config: ComponentRetryConfig::ZERO,
1299            },
1300            sim_clock.clone(),
1301            db_exec.clone(),
1302            parent_worker,
1303            sim_clock.now(),
1304        )
1305        .await;
1306
1307        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1308        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1309        // executor does not append anything, this should have been written by the worker:
1310        {
1311            let params = Params::empty();
1312            let child = CreateRequest {
1313                created_at: sim_clock.now(),
1314                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1315                ffqn: FFQN_CHILD,
1316                params: params.clone(),
1317                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1318                metadata: concepts::ExecutionMetadata::empty(),
1319                scheduled_at: sim_clock.now(),
1320                component_id: ComponentId::dummy_activity(),
1321                scheduled_by: None,
1322            };
1323            let current_time = sim_clock.now();
1324            let join_set = AppendRequest {
1325                created_at: current_time,
1326                event: ExecutionEventInner::HistoryEvent {
1327                    event: HistoryEvent::JoinSetCreate {
1328                        join_set_id: join_set_id.clone(),
1329                    },
1330                },
1331            };
1332            let child_exec_req = AppendRequest {
1333                created_at: current_time,
1334                event: ExecutionEventInner::HistoryEvent {
1335                    event: HistoryEvent::JoinSetRequest {
1336                        join_set_id: join_set_id.clone(),
1337                        request: JoinSetRequest::ChildExecutionRequest {
1338                            child_execution_id: child_execution_id.clone(),
1339                            target_ffqn: FFQN_CHILD,
1340                            params,
1341                        },
1342                    },
1343                },
1344            };
1345            let join_next = AppendRequest {
1346                created_at: current_time,
1347                event: ExecutionEventInner::HistoryEvent {
1348                    event: HistoryEvent::JoinNext {
1349                        join_set_id: join_set_id.clone(),
1350                        run_expires_at: sim_clock.now(),
1351                        closing: false,
1352                        requested_ffqn: Some(FFQN_CHILD),
1353                    },
1354                },
1355            };
1356            db_pool
1357                .connection()
1358                .append_batch_create_new_execution(
1359                    current_time,
1360                    vec![join_set, child_exec_req, join_next],
1361                    parent_execution_id.clone(),
1362                    Version::new(2),
1363                    vec![child],
1364                )
1365                .await
1366                .unwrap();
1367        }
1368
1369        let child_worker =
1370            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1371
1372        // execute the child
1373        tick_fn(
1374            ExecConfig {
1375                batch_size: 1,
1376                lock_expiry: LOCK_EXPIRY,
1377                tick_sleep: Duration::ZERO,
1378                component_id: ComponentId::dummy_activity(),
1379                task_limiter: None,
1380                executor_id: ExecutorId::generate(),
1381                retry_config: ComponentRetryConfig::ZERO,
1382            },
1383            sim_clock.clone(),
1384            db_exec.clone(),
1385            child_worker,
1386            sim_clock.now(),
1387        )
1388        .await;
1389        if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1390            // In case of timeout, let the timers watcher handle it
1391            sim_clock.move_time_forward(LOCK_EXPIRY);
1392            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1393                .await
1394                .unwrap();
1395        }
1396        let child_log = db_pool
1397            .connection()
1398            .get(&ExecutionId::Derived(child_execution_id.clone()))
1399            .await
1400            .unwrap();
1401        assert!(child_log.pending_state.is_finished());
1402        assert_eq!(
1403            Version(2),
1404            child_log.next_version,
1405            "created = 0, locked = 1, with_single_result = 2"
1406        );
1407        assert_eq!(
1408            ExecutionEventInner::Finished {
1409                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1410                http_client_traces: None
1411            },
1412            child_log.last_event().event
1413        );
1414        let parent_log = db_pool
1415            .connection()
1416            .get(&parent_execution_id)
1417            .await
1418            .unwrap();
1419        assert_matches!(
1420            parent_log.pending_state,
1421            PendingState::PendingAt {
1422                scheduled_at,
1423                last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1424            } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1425            "parent should be back to pending"
1426        );
1427        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1428            parent_log.responses.last(),
1429            Some(JoinSetResponseEventOuter{
1430                created_at: at,
1431                event: JoinSetResponseEvent{
1432                    join_set_id: found_join_set_id,
1433                    event: JoinSetResponse::ChildExecutionFinished {
1434                        child_execution_id: found_child_execution_id,
1435                        finished_version,
1436                        result: found_result,
1437                    }
1438                }
1439            })
1440             if *at == sim_clock.now()
1441            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1442        );
1443        assert_eq!(join_set_id, *found_join_set_id);
1444        assert_eq!(child_execution_id, *found_child_execution_id);
1445        assert_eq!(child_log.next_version, *child_finished_version);
1446        assert_matches!(
1447            found_result,
1448            SupportedFunctionReturnValue::ExecutionError(_)
1449        );
1450
1451        db_close.close().await;
1452    }
1453
1454    #[derive(Clone, Debug)]
1455    struct SleepyWorker {
1456        duration: Duration,
1457        result: SupportedFunctionReturnValue,
1458        exported: [FunctionMetadata; 1],
1459    }
1460
1461    #[async_trait]
1462    impl Worker for SleepyWorker {
1463        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1464            tokio::time::sleep(self.duration).await;
1465            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1466        }
1467
1468        fn exported_functions(&self) -> &[FunctionMetadata] {
1469            &self.exported
1470        }
1471    }
1472
1473    #[tokio::test]
1474    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1475        set_up();
1476        let sim_clock = SimClock::default();
1477        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1478        let lock_expiry = Duration::from_millis(100);
1479        let timeout_duration = Duration::from_millis(300);
1480        let retry_config = ComponentRetryConfig {
1481            max_retries: 1,
1482            retry_exp_backoff: timeout_duration,
1483        };
1484        let exec_config = ExecConfig {
1485            batch_size: 1,
1486            lock_expiry,
1487            tick_sleep: Duration::ZERO,
1488            component_id: ComponentId::dummy_activity(),
1489            task_limiter: None,
1490            executor_id: ExecutorId::generate(),
1491            retry_config,
1492        };
1493
1494        let worker = Arc::new(SleepyWorker {
1495            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1496            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1497            exported: [FunctionMetadata {
1498                ffqn: FFQN_SOME,
1499                parameter_types: ParameterTypes::default(),
1500                return_type: RETURN_TYPE_DUMMY,
1501                extension: None,
1502                submittable: true,
1503            }],
1504        });
1505        // Create an execution
1506        let execution_id = ExecutionId::generate();
1507        let db_connection = db_pool.connection();
1508        db_connection
1509            .create(CreateRequest {
1510                created_at: sim_clock.now(),
1511                execution_id: execution_id.clone(),
1512                ffqn: FFQN_SOME,
1513                params: Params::empty(),
1514                parent: None,
1515                metadata: concepts::ExecutionMetadata::empty(),
1516                scheduled_at: sim_clock.now(),
1517                component_id: ComponentId::dummy_activity(),
1518                scheduled_by: None,
1519            })
1520            .await
1521            .unwrap();
1522
1523        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1524        let executor = ExecTask::new_test(
1525            worker,
1526            exec_config.clone(),
1527            sim_clock.clone(),
1528            db_exec.clone(),
1529            ffqns,
1530        );
1531        let mut first_execution_progress = executor
1532            .tick(sim_clock.now(), RunId::generate())
1533            .await
1534            .unwrap();
1535        assert_eq!(1, first_execution_progress.executions.len());
1536        // Started hanging, wait for lock expiry.
1537        sim_clock.move_time_forward(lock_expiry);
1538        // cleanup should be called
1539        let now_after_first_lock_expiry = sim_clock.now();
1540        {
1541            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1542            let cleanup_progress = executor
1543                .tick(now_after_first_lock_expiry, RunId::generate())
1544                .await
1545                .unwrap();
1546            assert!(cleanup_progress.executions.is_empty());
1547        }
1548        {
1549            let expired_locks = expired_timers_watcher::tick(
1550                db_pool.connection().as_ref(),
1551                now_after_first_lock_expiry,
1552            )
1553            .await
1554            .unwrap()
1555            .expired_locks;
1556            assert_eq!(1, expired_locks);
1557        }
1558        assert!(
1559            !first_execution_progress
1560                .executions
1561                .pop()
1562                .unwrap()
1563                .1
1564                .is_finished()
1565        );
1566
1567        let execution_log = db_connection.get(&execution_id).await.unwrap();
1568        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1569        assert_matches!(
1570            &execution_log.events.get(2).unwrap(),
1571            ExecutionEvent {
1572                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1573                created_at: at,
1574                backtrace_id: None,
1575                version: Version(2),
1576            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1577        );
1578        assert_matches!(
1579            execution_log.pending_state,
1580            PendingState::PendingAt {
1581                scheduled_at: found_scheduled_by,
1582                last_lock: Some(LockedBy {
1583                    executor_id: found_executor_id,
1584                    run_id: _,
1585                }),
1586            } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1587        );
1588        sim_clock.move_time_forward(timeout_duration);
1589        let now_after_first_timeout = sim_clock.now();
1590        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1591
1592        let mut second_execution_progress = executor
1593            .tick(now_after_first_timeout, RunId::generate())
1594            .await
1595            .unwrap();
1596        assert_eq!(1, second_execution_progress.executions.len());
1597
1598        // Started hanging, wait for lock expiry.
1599        sim_clock.move_time_forward(lock_expiry);
1600        // cleanup should be called
1601        let now_after_second_lock_expiry = sim_clock.now();
1602        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1603        {
1604            let cleanup_progress = executor
1605                .tick(now_after_second_lock_expiry, RunId::generate())
1606                .await
1607                .unwrap();
1608            assert!(cleanup_progress.executions.is_empty());
1609        }
1610        {
1611            let expired_locks = expired_timers_watcher::tick(
1612                db_pool.connection().as_ref(),
1613                now_after_second_lock_expiry,
1614            )
1615            .await
1616            .unwrap()
1617            .expired_locks;
1618            assert_eq!(1, expired_locks);
1619        }
1620        assert!(
1621            !second_execution_progress
1622                .executions
1623                .pop()
1624                .unwrap()
1625                .1
1626                .is_finished()
1627        );
1628
1629        drop(db_connection);
1630        drop(executor);
1631        db_close.close().await;
1632    }
1633}