obeli_sk_executor/
executor.rs

1use crate::worker::{FatalError, Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11    ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16    FinishedExecutionError,
17    storage::{ExecutionRequest, Version},
18};
19use std::{
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31    pub lock_expiry: Duration,
32    pub tick_sleep: Duration,
33    pub batch_size: u32,
34    pub component_id: ComponentId,
35    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36    pub executor_id: ExecutorId,
37    pub retry_config: ComponentRetryConfig,
38    pub locking_strategy: LockingStrategy,
39}
40
41pub struct ExecTask<C: ClockFn> {
42    worker: Arc<dyn Worker>,
43    config: ExecConfig,
44    clock_fn: C, // Used for obtaining current time when the execution finishes.
45    db_exec: Arc<dyn DbExecutor>,
46    locking_strategy_holder: LockingStrategyHolder,
47}
48
49#[derive(derive_more::Debug, Default)]
50pub struct ExecutionProgress {
51    #[debug(skip)]
52    #[allow(dead_code)]
53    executions: Vec<(ExecutionId, JoinHandle<()>)>,
54}
55
56impl ExecutionProgress {
57    #[cfg(feature = "test")]
58    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
59        let mut vec = Vec::new();
60        for (exe, join_handle) in self.executions {
61            vec.push(exe);
62            join_handle.await.unwrap();
63        }
64        vec
65    }
66}
67
68#[derive(derive_more::Debug)]
69pub struct ExecutorTaskHandle {
70    #[debug(skip)]
71    is_closing: Arc<AtomicBool>,
72    #[debug(skip)]
73    abort_handle: AbortHandle,
74    component_id: ComponentId,
75    executor_id: ExecutorId,
76}
77
78impl ExecutorTaskHandle {
79    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
80    pub async fn close(&self) {
81        trace!("Gracefully closing");
82        // TODO: Close all the workers tasks as well.
83        // Simple solution:
84        // Attempt to unlock all in-progress executions. Activities will not be penalized for shutdown, although the number of retries would increase,
85        // some progress will be lost.
86        // More complex solution:
87        // Activities will be awaited, up to some deadline, then unlocked.
88        // Workflows that are awating should be signaled using `DeadlineTracker`.
89        self.is_closing.store(true, Ordering::Relaxed);
90        while !self.abort_handle.is_finished() {
91            tokio::time::sleep(Duration::from_millis(1)).await;
92        }
93        debug!("Gracefully closed");
94    }
95}
96
97impl Drop for ExecutorTaskHandle {
98    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
99    fn drop(&mut self) {
100        if self.abort_handle.is_finished() {
101            return;
102        }
103        warn!("Aborting the executor task");
104        self.abort_handle.abort();
105    }
106}
107
108#[cfg(feature = "test")]
109pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
110    extract_exported_ffqns_noext(worker)
111}
112
113fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
114    worker
115        .exported_functions()
116        .iter()
117        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
118        .collect::<Arc<_>>()
119}
120
121#[derive(Debug, Default, Clone, Copy)]
122pub enum LockingStrategy {
123    #[default]
124    ByFfqns,
125    ByComponentId,
126}
127impl LockingStrategy {
128    fn holder(&self, ffqns: Arc<[FunctionFqn]>) -> LockingStrategyHolder {
129        match self {
130            LockingStrategy::ByFfqns => LockingStrategyHolder::ByFfqns(ffqns),
131            LockingStrategy::ByComponentId => LockingStrategyHolder::ByComponentId,
132        }
133    }
134}
135
136enum LockingStrategyHolder {
137    ByFfqns(Arc<[FunctionFqn]>),
138    ByComponentId,
139}
140
141impl<C: ClockFn + 'static> ExecTask<C> {
142    #[cfg(feature = "test")]
143    pub fn new_test(
144        worker: Arc<dyn Worker>,
145        config: ExecConfig,
146        clock_fn: C,
147        db_exec: Arc<dyn DbExecutor>,
148        ffqns: Arc<[FunctionFqn]>,
149    ) -> Self {
150        Self {
151            worker,
152            locking_strategy_holder: config.locking_strategy.holder(ffqns),
153            config,
154            clock_fn,
155            db_exec,
156        }
157    }
158
159    #[cfg(feature = "test")]
160    pub fn new_all_ffqns_test(
161        worker: Arc<dyn Worker>,
162        config: ExecConfig,
163        clock_fn: C,
164        db_exec: Arc<dyn DbExecutor>,
165    ) -> Self {
166        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167        Self {
168            worker,
169            locking_strategy_holder: config.locking_strategy.holder(ffqns),
170            config,
171            clock_fn,
172            db_exec,
173        }
174    }
175
176    pub fn spawn_new(
177        worker: Arc<dyn Worker>,
178        config: ExecConfig,
179        clock_fn: C,
180        db_exec: Arc<dyn DbExecutor>,
181        sleep: impl Sleep + 'static,
182    ) -> ExecutorTaskHandle {
183        let is_closing = Arc::new(AtomicBool::default());
184        let is_closing_inner = is_closing.clone();
185        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
186        let component_id = config.component_id.clone();
187        let executor_id = config.executor_id;
188        let abort_handle = tokio::spawn(async move {
189            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
190            let lock_strategy_holder = config.locking_strategy.holder(ffqns);
191            let task = ExecTask {
192                worker,
193                config,
194                db_exec,
195                locking_strategy_holder: lock_strategy_holder,
196                clock_fn: clock_fn.clone(),
197            };
198            while !is_closing_inner.load(Ordering::Relaxed) {
199                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
200
201                task.db_exec
202                    .wait_for_pending_by_component_id(clock_fn.now(), &task.config.component_id, {
203                        let sleep = sleep.clone();
204                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
205                    .await;
206            }
207        })
208        .abort_handle();
209        ExecutorTaskHandle {
210            is_closing,
211            abort_handle,
212            component_id,
213            executor_id,
214        }
215    }
216
217    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
218        if let Some(task_limiter) = &self.config.task_limiter {
219            let mut locks = Vec::new();
220            for _ in 0..self.config.batch_size {
221                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
222                    locks.push(Some(permit));
223                } else {
224                    break;
225                }
226            }
227            locks
228        } else {
229            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
230            for _ in 0..self.config.batch_size {
231                vec.push(None);
232            }
233            vec
234        }
235    }
236
237    #[cfg(feature = "test")]
238    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
239        self.tick(executed_at, run_id).await.unwrap()
240    }
241
242    #[cfg(feature = "test")]
243    pub async fn tick_test_await(
244        &self,
245        executed_at: DateTime<Utc>,
246        run_id: RunId,
247    ) -> Vec<ExecutionId> {
248        self.tick(executed_at, run_id)
249            .await
250            .unwrap()
251            .wait_for_tasks()
252            .await
253    }
254
255    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
256    async fn tick(
257        &self,
258        executed_at: DateTime<Utc>,
259        run_id: RunId,
260    ) -> Result<ExecutionProgress, DbErrorGeneric> {
261        let locked_executions = {
262            let mut permits = self.acquire_task_permits();
263            if permits.is_empty() {
264                return Ok(ExecutionProgress::default());
265            }
266            let lock_expires_at = executed_at + self.config.lock_expiry;
267            let locked_executions = match &self.locking_strategy_holder {
268                LockingStrategyHolder::ByFfqns(ffqns) => {
269                    self.db_exec
270                        .lock_pending_by_ffqns(
271                            permits.len(), // batch size
272                            executed_at,   // fetch expiring before now
273                            ffqns.clone(),
274                            executed_at, // created at
275                            self.config.component_id.clone(),
276                            self.config.executor_id,
277                            lock_expires_at,
278                            run_id,
279                            self.config.retry_config,
280                        )
281                        .await?
282                }
283                LockingStrategyHolder::ByComponentId => {
284                    self.db_exec
285                        .lock_pending_by_component_id(
286                            permits.len(), // batch size
287                            executed_at,   // pending_at_or_sooner
288                            &self.config.component_id,
289                            executed_at, // created at
290                            self.config.executor_id,
291                            lock_expires_at,
292                            run_id,
293                            self.config.retry_config,
294                        )
295                        .await?
296                }
297            };
298            // Drop permits if too many were allocated.
299            while permits.len() > locked_executions.len() {
300                permits.pop();
301            }
302            assert_eq!(permits.len(), locked_executions.len());
303            locked_executions.into_iter().zip(permits)
304        };
305
306        let mut executions = Vec::with_capacity(locked_executions.len());
307        for (locked_execution, permit) in locked_executions {
308            let execution_id = locked_execution.execution_id.clone();
309            let join_handle = {
310                let worker = self.worker.clone();
311                let db = self.db_exec.clone();
312                let clock_fn = self.clock_fn.clone();
313                let worker_span = info_span!(parent: None, "worker",
314                    "otel.name" = format!("worker {}", locked_execution.ffqn),
315                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
316                locked_execution.metadata.enrich(&worker_span);
317                tokio::spawn({
318                    let worker_span2 = worker_span.clone();
319                    let retry_config = self.config.retry_config;
320                    async move {
321                        let _permit = permit;
322                        let res = Self::run_worker(
323                            worker,
324                            db.as_ref(),
325                            clock_fn,
326                            locked_execution,
327                            retry_config,
328                            worker_span2,
329                        )
330                        .await;
331                        if let Err(db_error) = res {
332                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
333                        }
334                    }
335                    .instrument(worker_span)
336                })
337            };
338            executions.push((execution_id, join_handle));
339        }
340        Ok(ExecutionProgress { executions })
341    }
342
343    async fn run_worker(
344        worker: Arc<dyn Worker>,
345        db_exec: &dyn DbExecutor,
346        clock_fn: C,
347        locked_execution: LockedExecution,
348        retry_config: ComponentRetryConfig,
349        worker_span: Span,
350    ) -> Result<(), DbErrorWrite> {
351        debug!("Worker::run starting");
352        trace!(
353            version = %locked_execution.next_version,
354            params = ?locked_execution.params,
355            event_history = ?locked_execution.event_history,
356            "Worker::run starting"
357        );
358        let can_be_retried = ExecutionLog::can_be_retried_after(
359            locked_execution.intermittent_event_count + 1,
360            retry_config.max_retries,
361            retry_config.retry_exp_backoff,
362        );
363        let unlock_expiry_on_limit_reached =
364            ExecutionLog::compute_retry_duration_when_retrying_forever(
365                locked_execution.intermittent_event_count + 1,
366                retry_config.retry_exp_backoff,
367            );
368        let ctx = WorkerContext {
369            execution_id: locked_execution.execution_id.clone(),
370            metadata: locked_execution.metadata,
371            ffqn: locked_execution.ffqn,
372            params: locked_execution.params,
373            event_history: locked_execution.event_history,
374            responses: locked_execution
375                .responses
376                .into_iter()
377                .map(|outer| outer.event)
378                .collect(),
379            version: locked_execution.next_version,
380            can_be_retried: can_be_retried.is_some(),
381            locked_event: locked_execution.locked_event,
382            worker_span,
383        };
384        let worker_result = worker.run(ctx).await;
385        trace!(?worker_result, "Worker::run finished");
386        let result_obtained_at = clock_fn.now();
387        match Self::worker_result_to_execution_event(
388            locked_execution.execution_id,
389            worker_result,
390            result_obtained_at,
391            locked_execution.parent,
392            can_be_retried,
393            unlock_expiry_on_limit_reached,
394        )? {
395            Some(append) => {
396                trace!("Appending {append:?}");
397                match append {
398                    AppendOrCancel::Cancel {
399                        execution_id,
400                        cancelled_at,
401                    } => db_exec
402                        .cancel_activity_with_retries(&execution_id, cancelled_at)
403                        .await
404                        .map(|_| ()),
405                    AppendOrCancel::Other(append) => append.append(db_exec).await,
406                }
407            }
408            None => Ok(()),
409        }
410    }
411
412    /// Map the `WorkerError` to an temporary or a permanent failure.
413    fn worker_result_to_execution_event(
414        execution_id: ExecutionId,
415        worker_result: WorkerResult,
416        result_obtained_at: DateTime<Utc>,
417        parent: Option<(ExecutionId, JoinSetId)>,
418        can_be_retried: Option<Duration>,
419        unlock_expiry_on_limit_reached: Duration,
420    ) -> Result<Option<AppendOrCancel>, DbErrorWrite> {
421        Ok(match worker_result {
422            WorkerResult::Ok(result, new_version, http_client_traces) => {
423                info!("Execution finished: {result}");
424                let child_finished =
425                    parent.map(
426                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
427                            parent_execution_id,
428                            parent_join_set,
429                            result: result.clone(),
430                        },
431                    );
432                let primary_event = AppendRequest {
433                    created_at: result_obtained_at,
434                    event: ExecutionRequest::Finished {
435                        result,
436                        http_client_traces,
437                    },
438                };
439
440                Some(AppendOrCancel::Other(Append {
441                    created_at: result_obtained_at,
442                    primary_event,
443                    execution_id,
444                    version: new_version,
445                    child_finished,
446                }))
447            }
448            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
449            WorkerResult::Err(err) => {
450                let reason_generic = err.to_string(); // Override with err's reason if no information is lost.
451
452                let (primary_event, child_finished, version) = match err {
453                    WorkerError::TemporaryTimeout {
454                        http_client_traces,
455                        version,
456                    } => {
457                        if let Some(duration) = can_be_retried {
458                            let backoff_expires_at = result_obtained_at + duration;
459                            info!(
460                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
461                            );
462                            (
463                                ExecutionRequest::TemporarilyTimedOut {
464                                    backoff_expires_at,
465                                    http_client_traces,
466                                },
467                                None,
468                                version,
469                            )
470                        } else {
471                            info!("Execution timed out");
472                            let result = SupportedFunctionReturnValue::ExecutionError(
473                                FinishedExecutionError {
474                                    kind: ExecutionFailureKind::TimedOut,
475                                    reason: None,
476                                    detail: None,
477                                },
478                            );
479                            let child_finished =
480                                parent.map(|(parent_execution_id, parent_join_set)| {
481                                    ChildFinishedResponse {
482                                        parent_execution_id,
483                                        parent_join_set,
484                                        result: result.clone(),
485                                    }
486                                });
487                            (
488                                ExecutionRequest::Finished {
489                                    result,
490                                    http_client_traces,
491                                },
492                                child_finished,
493                                version,
494                            )
495                        }
496                    }
497                    WorkerError::DbError(db_error) => {
498                        return Err(db_error);
499                    }
500                    WorkerError::ActivityTrap {
501                        reason: _, // reason_generic contains trap_kind + reason
502                        trap_kind,
503                        detail,
504                        version,
505                        http_client_traces,
506                    } => {
507                        if let Some(duration) = can_be_retried {
508                            let expires_at = result_obtained_at + duration;
509                            debug!(
510                                "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
511                            );
512                            (
513                                ExecutionRequest::TemporarilyFailed {
514                                    reason: StrVariant::from(reason_generic),
515                                    backoff_expires_at: expires_at,
516                                    detail,
517                                    http_client_traces,
518                                },
519                                None,
520                                version,
521                            )
522                        } else {
523                            info!(
524                                "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
525                            );
526                            let result = SupportedFunctionReturnValue::ExecutionError(
527                                FinishedExecutionError {
528                                    reason: Some(reason_generic),
529                                    kind: ExecutionFailureKind::Uncategorized,
530                                    detail,
531                                },
532                            );
533                            let child_finished =
534                                parent.map(|(parent_execution_id, parent_join_set)| {
535                                    ChildFinishedResponse {
536                                        parent_execution_id,
537                                        parent_join_set,
538                                        result: result.clone(),
539                                    }
540                                });
541                            (
542                                ExecutionRequest::Finished {
543                                    result,
544                                    http_client_traces,
545                                },
546                                child_finished,
547                                version,
548                            )
549                        }
550                    }
551                    WorkerError::ActivityPreopenedDirError {
552                        reason,
553                        detail,
554                        version,
555                    } => {
556                        let http_client_traces = None;
557                        if let Some(duration) = can_be_retried {
558                            let expires_at = result_obtained_at + duration;
559                            debug!(
560                                "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
561                            );
562                            (
563                                ExecutionRequest::TemporarilyFailed {
564                                    reason: StrVariant::from(reason_generic),
565                                    backoff_expires_at: expires_at,
566                                    detail: Some(detail),
567                                    http_client_traces,
568                                },
569                                None,
570                                version,
571                            )
572                        } else {
573                            info!(
574                                "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
575                            );
576                            let result = SupportedFunctionReturnValue::ExecutionError(
577                                FinishedExecutionError {
578                                    reason: Some(reason_generic),
579                                    kind: ExecutionFailureKind::Uncategorized,
580                                    detail: Some(detail),
581                                },
582                            );
583                            let child_finished =
584                                parent.map(|(parent_execution_id, parent_join_set)| {
585                                    ChildFinishedResponse {
586                                        parent_execution_id,
587                                        parent_join_set,
588                                        result: result.clone(),
589                                    }
590                                });
591                            (
592                                ExecutionRequest::Finished {
593                                    result,
594                                    http_client_traces,
595                                },
596                                child_finished,
597                                version,
598                            )
599                        }
600                    }
601                    WorkerError::ActivityReturnedError {
602                        detail,
603                        version,
604                        http_client_traces,
605                    } => {
606                        let duration = can_be_retried.expect(
607                            "ActivityReturnedError must not be returned when retries are exhausted",
608                        );
609                        let expires_at = result_obtained_at + duration;
610                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
611                        (
612                            ExecutionRequest::TemporarilyFailed {
613                                backoff_expires_at: expires_at,
614                                reason: StrVariant::Static("activity returned error"), // is same as the variant's display message.
615                                detail, // contains the backtrace
616                                http_client_traces,
617                            },
618                            None,
619                            version,
620                        )
621                    }
622                    WorkerError::LimitReached {
623                        reason,
624                        version: new_version,
625                    } => {
626                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
627                        warn!(
628                            "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
629                        );
630                        (
631                            ExecutionRequest::Unlocked {
632                                backoff_expires_at: expires_at,
633                                reason: StrVariant::from(reason),
634                            },
635                            None,
636                            new_version,
637                        )
638                    }
639                    WorkerError::FatalError(FatalError::Cancelled, _version) => {
640                        // use more optimized `cancel` to avoid polluting the logs with errors on a race.
641                        return Ok(Some(AppendOrCancel::Cancel {
642                            execution_id,
643                            cancelled_at: result_obtained_at,
644                        }));
645                    }
646                    WorkerError::FatalError(fatal_error, version) => {
647                        warn!("Fatal worker error - {fatal_error:?}");
648                        let result = SupportedFunctionReturnValue::ExecutionError(
649                            FinishedExecutionError::from(fatal_error),
650                        );
651                        let child_finished =
652                            parent.map(|(parent_execution_id, parent_join_set)| {
653                                ChildFinishedResponse {
654                                    parent_execution_id,
655                                    parent_join_set,
656                                    result: result.clone(),
657                                }
658                            });
659                        (
660                            ExecutionRequest::Finished {
661                                result,
662                                http_client_traces: None,
663                            },
664                            child_finished,
665                            version,
666                        )
667                    }
668                };
669                Some(AppendOrCancel::Other(Append {
670                    created_at: result_obtained_at,
671                    primary_event: AppendRequest {
672                        created_at: result_obtained_at,
673                        event: primary_event,
674                    },
675                    execution_id,
676                    version,
677                    child_finished,
678                }))
679            }
680        })
681    }
682}
683
684#[derive(Debug, Clone)]
685pub(crate) struct ChildFinishedResponse {
686    pub(crate) parent_execution_id: ExecutionId,
687    pub(crate) parent_join_set: JoinSetId,
688    pub(crate) result: SupportedFunctionReturnValue,
689}
690
691#[derive(Debug, Clone)]
692#[expect(clippy::large_enum_variant)]
693pub(crate) enum AppendOrCancel {
694    Cancel {
695        execution_id: ExecutionId,
696        cancelled_at: DateTime<Utc>,
697    },
698    Other(Append),
699}
700
701#[derive(Debug, Clone)]
702pub(crate) struct Append {
703    pub(crate) created_at: DateTime<Utc>,
704    pub(crate) primary_event: AppendRequest,
705    pub(crate) execution_id: ExecutionId,
706    pub(crate) version: Version,
707    pub(crate) child_finished: Option<ChildFinishedResponse>,
708}
709
710impl Append {
711    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
712        if let Some(child_finished) = self.child_finished {
713            assert_matches!(
714                &self.primary_event,
715                AppendRequest {
716                    event: ExecutionRequest::Finished { .. },
717                    ..
718                }
719            );
720            let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
721            let events = AppendEventsToExecution {
722                execution_id: self.execution_id,
723                version: self.version.clone(),
724                batch: vec![self.primary_event],
725            };
726            let response = AppendResponseToExecution {
727                parent_execution_id: child_finished.parent_execution_id,
728                created_at: self.created_at,
729                join_set_id: child_finished.parent_join_set,
730                child_execution_id,
731                finished_version: self.version, // Since self.primary_event is a finished event, the version will remain the same.
732                result: child_finished.result,
733            };
734
735            db_exec
736                .append_batch_respond_to_parent(events, response, self.created_at)
737                .await?;
738        } else {
739            db_exec
740                .append(self.execution_id, self.version, self.primary_event)
741                .await?;
742        }
743        Ok(())
744    }
745}
746
747#[cfg(any(test, feature = "test"))]
748pub mod simple_worker {
749    use crate::worker::{Worker, WorkerContext, WorkerResult};
750    use async_trait::async_trait;
751    use concepts::{
752        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
753        storage::{HistoryEvent, Version},
754    };
755    use indexmap::IndexMap;
756    use std::sync::Arc;
757    use tracing::trace;
758
759    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
760    pub type SimpleWorkerResultMap =
761        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
762
763    #[derive(Clone, Debug)]
764    pub struct SimpleWorker {
765        pub worker_results_rev: SimpleWorkerResultMap,
766        pub ffqn: FunctionFqn,
767        exported: [FunctionMetadata; 1],
768    }
769
770    impl SimpleWorker {
771        #[must_use]
772        pub fn with_single_result(res: WorkerResult) -> Self {
773            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
774                Version::new(2),
775                (vec![], res),
776            )]))))
777        }
778
779        #[must_use]
780        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
781            Self {
782                worker_results_rev: self.worker_results_rev,
783                exported: [FunctionMetadata {
784                    ffqn: ffqn.clone(),
785                    parameter_types: ParameterTypes::default(),
786                    return_type: RETURN_TYPE_DUMMY,
787                    extension: None,
788                    submittable: true,
789                }],
790                ffqn,
791            }
792        }
793
794        #[must_use]
795        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
796            Self {
797                worker_results_rev,
798                ffqn: FFQN_SOME,
799                exported: [FunctionMetadata {
800                    ffqn: FFQN_SOME,
801                    parameter_types: ParameterTypes::default(),
802                    return_type: RETURN_TYPE_DUMMY,
803                    extension: None,
804                    submittable: true,
805                }],
806            }
807        }
808    }
809
810    #[async_trait]
811    impl Worker for SimpleWorker {
812        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
813            let (expected_version, (expected_eh, worker_result)) =
814                self.worker_results_rev.lock().unwrap().pop().unwrap();
815            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
816            assert_eq!(expected_version, ctx.version);
817            assert_eq!(
818                expected_eh,
819                ctx.event_history
820                    .iter()
821                    .map(|(event, _version)| event.clone())
822                    .collect::<Vec<_>>()
823            );
824            worker_result
825        }
826
827        fn exported_functions(&self) -> &[FunctionMetadata] {
828            &self.exported
829        }
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use self::simple_worker::SimpleWorker;
836    use super::*;
837    use crate::{expired_timers_watcher, worker::WorkerResult};
838    use assert_matches::assert_matches;
839    use async_trait::async_trait;
840    use concepts::storage::{
841        CreateRequest, DbConnection, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
842    };
843    use concepts::storage::{DbPoolCloseable, LockedBy};
844    use concepts::storage::{ExecutionEvent, ExecutionRequest, HistoryEvent, PendingState};
845    use concepts::time::Now;
846    use concepts::{
847        FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
848        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
849    };
850    use db_tests::Database;
851    use indexmap::IndexMap;
852    use rstest::rstest;
853    use simple_worker::FFQN_SOME;
854    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
855    use test_utils::set_up;
856    use test_utils::sim_clock::{ConstClock, SimClock};
857
858    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
859
860    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
861        config: ExecConfig,
862        clock_fn: C,
863        db_exec: Arc<dyn DbExecutor>,
864        worker: Arc<W>,
865        executed_at: DateTime<Utc>,
866    ) -> Vec<ExecutionId> {
867        trace!("Ticking with {worker:?}");
868        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
869        let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
870        executor
871            .tick_test_await(executed_at, RunId::generate())
872            .await
873    }
874
875    #[rstest]
876    #[tokio::test]
877    async fn execute_simple_lifecycle_tick_based_mem(
878        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentId)]
879        locking_strategy: LockingStrategy,
880    ) {
881        let created_at = Now.now();
882        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
883        execute_simple_lifecycle_tick_based(
884            db_pool.connection().as_ref(),
885            db_exec.clone(),
886            ConstClock(created_at),
887            locking_strategy,
888        )
889        .await;
890        db_close.close().await;
891    }
892
893    #[rstest]
894    #[tokio::test]
895    async fn execute_simple_lifecycle_tick_based_sqlite(
896        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentId)]
897        locking_strategy: LockingStrategy,
898    ) {
899        let created_at = Now.now();
900        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
901        execute_simple_lifecycle_tick_based(
902            db_pool.connection().as_ref(),
903            db_exec.clone(),
904            ConstClock(created_at),
905            locking_strategy,
906        )
907        .await;
908        db_close.close().await;
909    }
910
911    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
912        db_connection: &dyn DbConnection,
913        db_exec: Arc<dyn DbExecutor>,
914        clock_fn: C,
915        locking_strategy: LockingStrategy,
916    ) {
917        set_up();
918        let created_at = clock_fn.now();
919        let exec_config = ExecConfig {
920            batch_size: 1,
921            lock_expiry: Duration::from_secs(1),
922            tick_sleep: Duration::from_millis(100),
923            component_id: ComponentId::dummy_activity(),
924            task_limiter: None,
925            executor_id: ExecutorId::generate(),
926            retry_config: ComponentRetryConfig::ZERO,
927            locking_strategy,
928        };
929
930        let execution_log = create_and_tick(
931            CreateAndTickConfig {
932                execution_id: ExecutionId::generate(),
933                created_at,
934                executed_at: created_at,
935            },
936            clock_fn,
937            db_connection,
938            db_exec,
939            exec_config,
940            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
941                SUPPORTED_RETURN_VALUE_OK_EMPTY,
942                Version::new(2),
943                None,
944            ))),
945            tick_fn,
946        )
947        .await;
948        assert_matches!(
949            execution_log.events.get(2).unwrap(),
950            ExecutionEvent {
951                event: ExecutionRequest::Finished {
952                    result: SupportedFunctionReturnValue::Ok { ok: None },
953                    http_client_traces: None
954                },
955                created_at: _,
956                backtrace_id: None,
957                version: Version(2),
958            }
959        );
960    }
961
962    #[tokio::test]
963    async fn execute_simple_lifecycle_task_based_mem() {
964        set_up();
965        let created_at = Now.now();
966        let clock_fn = ConstClock(created_at);
967        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
968        let exec_config = ExecConfig {
969            batch_size: 1,
970            lock_expiry: Duration::from_secs(1),
971            tick_sleep: Duration::ZERO,
972            component_id: ComponentId::dummy_activity(),
973            task_limiter: None,
974            executor_id: ExecutorId::generate(),
975            retry_config: ComponentRetryConfig::ZERO,
976            locking_strategy: LockingStrategy::default(),
977        };
978
979        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
980            SUPPORTED_RETURN_VALUE_OK_EMPTY,
981            Version::new(2),
982            None,
983        )));
984
985        let execution_log = create_and_tick(
986            CreateAndTickConfig {
987                execution_id: ExecutionId::generate(),
988                created_at,
989                executed_at: created_at,
990            },
991            clock_fn,
992            db_pool.connection().as_ref(),
993            db_exec,
994            exec_config,
995            worker,
996            tick_fn,
997        )
998        .await;
999        assert_matches!(
1000            execution_log.events.get(2).unwrap(),
1001            ExecutionEvent {
1002                event: ExecutionRequest::Finished {
1003                    result: SupportedFunctionReturnValue::Ok { ok: None },
1004                    http_client_traces: None
1005                },
1006                created_at: _,
1007                backtrace_id: None,
1008                version: Version(2),
1009            }
1010        );
1011        db_close.close().await;
1012    }
1013
1014    struct CreateAndTickConfig {
1015        execution_id: ExecutionId,
1016        created_at: DateTime<Utc>,
1017        executed_at: DateTime<Utc>,
1018    }
1019
1020    async fn create_and_tick<
1021        W: Worker,
1022        C: ClockFn,
1023        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
1024        F: Future<Output = Vec<ExecutionId>>,
1025    >(
1026        config: CreateAndTickConfig,
1027        clock_fn: C,
1028        db_connection: &dyn DbConnection,
1029        db_exec: Arc<dyn DbExecutor>,
1030        exec_config: ExecConfig,
1031        worker: Arc<W>,
1032        mut tick: T,
1033    ) -> ExecutionLog {
1034        // Create an execution
1035        db_connection
1036            .create(CreateRequest {
1037                created_at: config.created_at,
1038                execution_id: config.execution_id.clone(),
1039                ffqn: FFQN_SOME,
1040                params: Params::empty(),
1041                parent: None,
1042                metadata: concepts::ExecutionMetadata::empty(),
1043                scheduled_at: config.created_at,
1044                component_id: ComponentId::dummy_activity(),
1045                scheduled_by: None,
1046            })
1047            .await
1048            .unwrap();
1049        // execute!
1050        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
1051        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
1052        debug!("Execution history after tick: {execution_log:?}");
1053        // check that DB contains Created and Locked events.
1054        assert_matches!(
1055            execution_log.events.first().unwrap(),
1056            ExecutionEvent {
1057                event: ExecutionRequest::Created { .. },
1058                created_at: actually_created_at,
1059                backtrace_id: None,
1060                version: Version(0),
1061            }
1062            if config.created_at == *actually_created_at
1063        );
1064        let locked_at = assert_matches!(
1065            execution_log.events.get(1).unwrap(),
1066            ExecutionEvent {
1067                event: ExecutionRequest::Locked { .. },
1068                created_at: locked_at,
1069                backtrace_id: None,
1070                version: Version(1),
1071            } if config.created_at <= *locked_at
1072            => *locked_at
1073        );
1074        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
1075            event: _,
1076            created_at: executed_at,
1077            backtrace_id: None,
1078            version: Version(2),
1079        } if *executed_at >= locked_at);
1080        execution_log
1081    }
1082
1083    #[tokio::test]
1084    async fn activity_trap_should_trigger_an_execution_retry() {
1085        set_up();
1086        let sim_clock = SimClock::default();
1087        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1088        let retry_exp_backoff = Duration::from_millis(100);
1089        let retry_config = ComponentRetryConfig {
1090            max_retries: 1,
1091            retry_exp_backoff,
1092        };
1093        let exec_config = ExecConfig {
1094            batch_size: 1,
1095            lock_expiry: Duration::from_secs(1),
1096            tick_sleep: Duration::ZERO,
1097            component_id: ComponentId::dummy_activity(),
1098            task_limiter: None,
1099            executor_id: ExecutorId::generate(),
1100            retry_config,
1101            locking_strategy: LockingStrategy::default(),
1102        };
1103        let expected_reason = "error reason";
1104        let expected_detail = "error detail";
1105        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1106            WorkerError::ActivityTrap {
1107                reason: expected_reason.to_string(),
1108                trap_kind: concepts::TrapKind::Trap,
1109                detail: Some(expected_detail.to_string()),
1110                version: Version::new(2),
1111                http_client_traces: None,
1112            },
1113        )));
1114        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1115        let execution_log = create_and_tick(
1116            CreateAndTickConfig {
1117                execution_id: ExecutionId::generate(),
1118                created_at: sim_clock.now(),
1119                executed_at: sim_clock.now(),
1120            },
1121            sim_clock.clone(),
1122            db_pool.connection().as_ref(),
1123            db_exec.clone(),
1124            exec_config.clone(),
1125            worker,
1126            tick_fn,
1127        )
1128        .await;
1129        assert_eq!(3, execution_log.events.len());
1130        {
1131            let (reason, detail, at, expires_at) = assert_matches!(
1132                &execution_log.events.get(2).unwrap(),
1133                ExecutionEvent {
1134                    event: ExecutionRequest::TemporarilyFailed {
1135                        reason,
1136                        detail,
1137                        backoff_expires_at,
1138                        http_client_traces: None,
1139                    },
1140                    created_at: at,
1141                    backtrace_id: None,
1142                    version: Version(2),
1143                }
1144                => (reason, detail, *at, *backoff_expires_at)
1145            );
1146            assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1147            assert_eq!(Some(expected_detail), detail.as_deref());
1148            assert_eq!(at, sim_clock.now());
1149            assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1150        }
1151        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1152            std::sync::Mutex::new(IndexMap::from([(
1153                Version::new(4),
1154                (
1155                    vec![],
1156                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1157                ),
1158            )])),
1159        )));
1160        // noop until `retry_exp_backoff` expires
1161        assert!(
1162            tick_fn(
1163                exec_config.clone(),
1164                sim_clock.clone(),
1165                db_exec.clone(),
1166                worker.clone(),
1167                sim_clock.now(),
1168            )
1169            .await
1170            .is_empty()
1171        );
1172        // tick again to finish the execution
1173        sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1174        tick_fn(
1175            exec_config,
1176            sim_clock.clone(),
1177            db_exec.clone(),
1178            worker,
1179            sim_clock.now(),
1180        )
1181        .await;
1182        let execution_log = {
1183            let db_connection = db_pool.connection();
1184            db_connection
1185                .get(&execution_log.execution_id)
1186                .await
1187                .unwrap()
1188        };
1189        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1190        assert_matches!(
1191            execution_log.events.get(3).unwrap(),
1192            ExecutionEvent {
1193                event: ExecutionRequest::Locked { .. },
1194                created_at: at,
1195                backtrace_id: None,
1196                version: Version(3),
1197            } if *at == sim_clock.now()
1198        );
1199        assert_matches!(
1200            execution_log.events.get(4).unwrap(),
1201            ExecutionEvent {
1202                event: ExecutionRequest::Finished {
1203                    result: SupportedFunctionReturnValue::Ok{ok:None},
1204                    http_client_traces: None
1205                },
1206                created_at: finished_at,
1207                backtrace_id: None,
1208                version: Version(4),
1209            } if *finished_at == sim_clock.now()
1210        );
1211        db_close.close().await;
1212    }
1213
1214    #[tokio::test]
1215    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1216        set_up();
1217        let created_at = Now.now();
1218        let clock_fn = ConstClock(created_at);
1219        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1220        let exec_config = ExecConfig {
1221            batch_size: 1,
1222            lock_expiry: Duration::from_secs(1),
1223            tick_sleep: Duration::ZERO,
1224            component_id: ComponentId::dummy_activity(),
1225            task_limiter: None,
1226            executor_id: ExecutorId::generate(),
1227            retry_config: ComponentRetryConfig::ZERO,
1228            locking_strategy: LockingStrategy::default(),
1229        };
1230
1231        let reason = "error reason";
1232        let expected_reason = format!("activity trap: {reason}");
1233        let expected_detail = "error detail";
1234        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1235            WorkerError::ActivityTrap {
1236                reason: reason.to_string(),
1237                trap_kind: concepts::TrapKind::Trap,
1238                detail: Some(expected_detail.to_string()),
1239                version: Version::new(2),
1240                http_client_traces: None,
1241            },
1242        )));
1243        let execution_log = create_and_tick(
1244            CreateAndTickConfig {
1245                execution_id: ExecutionId::generate(),
1246                created_at,
1247                executed_at: created_at,
1248            },
1249            clock_fn,
1250            db_pool.connection().as_ref(),
1251            db_exec,
1252            exec_config.clone(),
1253            worker,
1254            tick_fn,
1255        )
1256        .await;
1257        assert_eq!(3, execution_log.events.len());
1258        let (reason, kind, detail) = assert_matches!(
1259            &execution_log.events.get(2).unwrap(),
1260            ExecutionEvent {
1261                event: ExecutionRequest::Finished{
1262                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1263                    http_client_traces: None
1264                },
1265                created_at: at,
1266                backtrace_id: None,
1267                version: Version(2),
1268            } if *at == created_at
1269            => (reason, kind, detail)
1270        );
1271
1272        assert_eq!(Some(expected_reason), *reason);
1273        assert_eq!(Some(expected_detail), detail.as_deref());
1274        assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1275
1276        db_close.close().await;
1277    }
1278
1279    #[tokio::test]
1280    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1281        let worker_error = WorkerError::ActivityTrap {
1282            reason: "error reason".to_string(),
1283            trap_kind: TrapKind::Trap,
1284            detail: Some("detail".to_string()),
1285            version: Version::new(2),
1286            http_client_traces: None,
1287        };
1288        let expected_child_err = FinishedExecutionError {
1289            kind: ExecutionFailureKind::Uncategorized,
1290            reason: Some("activity trap: error reason".to_string()),
1291            detail: Some("detail".to_string()),
1292        };
1293        child_execution_permanently_failed_should_notify_parent(
1294            WorkerResult::Err(worker_error),
1295            expected_child_err,
1296        )
1297        .await;
1298    }
1299
1300    #[tokio::test]
1301    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1302        let expected_child_err = FinishedExecutionError {
1303            kind: ExecutionFailureKind::TimedOut,
1304            reason: None,
1305            detail: None,
1306        };
1307        child_execution_permanently_failed_should_notify_parent(
1308            WorkerResult::DbUpdatedByWorkerOrWatcher,
1309            expected_child_err,
1310        )
1311        .await;
1312    }
1313
1314    async fn child_execution_permanently_failed_should_notify_parent(
1315        worker_result: WorkerResult,
1316        expected_child_err: FinishedExecutionError,
1317    ) {
1318        use concepts::storage::JoinSetResponseEventOuter;
1319        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1320
1321        set_up();
1322        let sim_clock = SimClock::default();
1323        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1324
1325        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1326            WorkerResult::DbUpdatedByWorkerOrWatcher,
1327        ));
1328        let parent_execution_id = ExecutionId::generate();
1329        db_pool
1330            .connection()
1331            .create(CreateRequest {
1332                created_at: sim_clock.now(),
1333                execution_id: parent_execution_id.clone(),
1334                ffqn: FFQN_SOME,
1335                params: Params::empty(),
1336                parent: None,
1337                metadata: concepts::ExecutionMetadata::empty(),
1338                scheduled_at: sim_clock.now(),
1339                component_id: ComponentId::dummy_activity(),
1340                scheduled_by: None,
1341            })
1342            .await
1343            .unwrap();
1344        let parent_executor_id = ExecutorId::generate();
1345        tick_fn(
1346            ExecConfig {
1347                batch_size: 1,
1348                lock_expiry: LOCK_EXPIRY,
1349                tick_sleep: Duration::ZERO,
1350                component_id: ComponentId::dummy_activity(),
1351                task_limiter: None,
1352                executor_id: parent_executor_id,
1353                retry_config: ComponentRetryConfig::ZERO,
1354                locking_strategy: LockingStrategy::default(),
1355            },
1356            sim_clock.clone(),
1357            db_exec.clone(),
1358            parent_worker,
1359            sim_clock.now(),
1360        )
1361        .await;
1362
1363        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1364        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1365        // executor does not append anything, this should have been written by the worker:
1366        {
1367            let params = Params::empty();
1368            let child = CreateRequest {
1369                created_at: sim_clock.now(),
1370                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1371                ffqn: FFQN_CHILD,
1372                params: params.clone(),
1373                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1374                metadata: concepts::ExecutionMetadata::empty(),
1375                scheduled_at: sim_clock.now(),
1376                component_id: ComponentId::dummy_activity(),
1377                scheduled_by: None,
1378            };
1379            let current_time = sim_clock.now();
1380            let join_set = AppendRequest {
1381                created_at: current_time,
1382                event: ExecutionRequest::HistoryEvent {
1383                    event: HistoryEvent::JoinSetCreate {
1384                        join_set_id: join_set_id.clone(),
1385                    },
1386                },
1387            };
1388            let child_exec_req = AppendRequest {
1389                created_at: current_time,
1390                event: ExecutionRequest::HistoryEvent {
1391                    event: HistoryEvent::JoinSetRequest {
1392                        join_set_id: join_set_id.clone(),
1393                        request: JoinSetRequest::ChildExecutionRequest {
1394                            child_execution_id: child_execution_id.clone(),
1395                            target_ffqn: FFQN_CHILD,
1396                            params,
1397                        },
1398                    },
1399                },
1400            };
1401            let join_next = AppendRequest {
1402                created_at: current_time,
1403                event: ExecutionRequest::HistoryEvent {
1404                    event: HistoryEvent::JoinNext {
1405                        join_set_id: join_set_id.clone(),
1406                        run_expires_at: sim_clock.now(),
1407                        closing: false,
1408                        requested_ffqn: Some(FFQN_CHILD),
1409                    },
1410                },
1411            };
1412            db_pool
1413                .connection()
1414                .append_batch_create_new_execution(
1415                    current_time,
1416                    vec![join_set, child_exec_req, join_next],
1417                    parent_execution_id.clone(),
1418                    Version::new(2),
1419                    vec![child],
1420                )
1421                .await
1422                .unwrap();
1423        }
1424
1425        let child_worker =
1426            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1427
1428        // execute the child
1429        tick_fn(
1430            ExecConfig {
1431                batch_size: 1,
1432                lock_expiry: LOCK_EXPIRY,
1433                tick_sleep: Duration::ZERO,
1434                component_id: ComponentId::dummy_activity(),
1435                task_limiter: None,
1436                executor_id: ExecutorId::generate(),
1437                retry_config: ComponentRetryConfig::ZERO,
1438                locking_strategy: LockingStrategy::default(),
1439            },
1440            sim_clock.clone(),
1441            db_exec.clone(),
1442            child_worker,
1443            sim_clock.now(),
1444        )
1445        .await;
1446        if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1447            // In case of timeout, let the timers watcher handle it
1448            sim_clock.move_time_forward(LOCK_EXPIRY);
1449            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1450                .await
1451                .unwrap();
1452        }
1453        let child_log = db_pool
1454            .connection()
1455            .get(&ExecutionId::Derived(child_execution_id.clone()))
1456            .await
1457            .unwrap();
1458        assert!(child_log.pending_state.is_finished());
1459        assert_eq!(
1460            Version(2),
1461            child_log.next_version,
1462            "created = 0, locked = 1, with_single_result = 2"
1463        );
1464        assert_eq!(
1465            ExecutionRequest::Finished {
1466                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1467                http_client_traces: None
1468            },
1469            child_log.last_event().event
1470        );
1471        let parent_log = db_pool
1472            .connection()
1473            .get(&parent_execution_id)
1474            .await
1475            .unwrap();
1476        assert_matches!(
1477            parent_log.pending_state,
1478            PendingState::PendingAt {
1479                scheduled_at,
1480                last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1481                component_id_input_digest: _
1482            } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1483            "parent should be back to pending"
1484        );
1485        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1486            parent_log.responses.last(),
1487            Some(JoinSetResponseEventOuter{
1488                created_at: at,
1489                event: JoinSetResponseEvent{
1490                    join_set_id: found_join_set_id,
1491                    event: JoinSetResponse::ChildExecutionFinished {
1492                        child_execution_id: found_child_execution_id,
1493                        finished_version,
1494                        result: found_result,
1495                    }
1496                }
1497            })
1498             if *at == sim_clock.now()
1499            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1500        );
1501        assert_eq!(join_set_id, *found_join_set_id);
1502        assert_eq!(child_execution_id, *found_child_execution_id);
1503        assert_eq!(child_log.next_version, *child_finished_version);
1504        assert_matches!(
1505            found_result,
1506            SupportedFunctionReturnValue::ExecutionError(_)
1507        );
1508
1509        db_close.close().await;
1510    }
1511
1512    #[derive(Clone, Debug)]
1513    struct SleepyWorker {
1514        duration: Duration,
1515        result: SupportedFunctionReturnValue,
1516        exported: [FunctionMetadata; 1],
1517    }
1518
1519    #[async_trait]
1520    impl Worker for SleepyWorker {
1521        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1522            tokio::time::sleep(self.duration).await;
1523            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1524        }
1525
1526        fn exported_functions(&self) -> &[FunctionMetadata] {
1527            &self.exported
1528        }
1529    }
1530
1531    #[tokio::test]
1532    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1533        set_up();
1534        let sim_clock = SimClock::default();
1535        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1536        let lock_expiry = Duration::from_millis(100);
1537        let timeout_duration = Duration::from_millis(300);
1538        let retry_config = ComponentRetryConfig {
1539            max_retries: 1,
1540            retry_exp_backoff: timeout_duration,
1541        };
1542        let exec_config = ExecConfig {
1543            batch_size: 1,
1544            lock_expiry,
1545            tick_sleep: Duration::ZERO,
1546            component_id: ComponentId::dummy_activity(),
1547            task_limiter: None,
1548            executor_id: ExecutorId::generate(),
1549            retry_config,
1550            locking_strategy: LockingStrategy::default(),
1551        };
1552
1553        let worker = Arc::new(SleepyWorker {
1554            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1555            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1556            exported: [FunctionMetadata {
1557                ffqn: FFQN_SOME,
1558                parameter_types: ParameterTypes::default(),
1559                return_type: RETURN_TYPE_DUMMY,
1560                extension: None,
1561                submittable: true,
1562            }],
1563        });
1564        // Create an execution
1565        let execution_id = ExecutionId::generate();
1566        let db_connection = db_pool.connection();
1567        db_connection
1568            .create(CreateRequest {
1569                created_at: sim_clock.now(),
1570                execution_id: execution_id.clone(),
1571                ffqn: FFQN_SOME,
1572                params: Params::empty(),
1573                parent: None,
1574                metadata: concepts::ExecutionMetadata::empty(),
1575                scheduled_at: sim_clock.now(),
1576                component_id: ComponentId::dummy_activity(),
1577                scheduled_by: None,
1578            })
1579            .await
1580            .unwrap();
1581
1582        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1583        let executor = ExecTask::new_test(
1584            worker,
1585            exec_config.clone(),
1586            sim_clock.clone(),
1587            db_exec.clone(),
1588            ffqns,
1589        );
1590        let mut first_execution_progress = executor
1591            .tick(sim_clock.now(), RunId::generate())
1592            .await
1593            .unwrap();
1594        assert_eq!(1, first_execution_progress.executions.len());
1595        // Started hanging, wait for lock expiry.
1596        sim_clock.move_time_forward(lock_expiry);
1597        // cleanup should be called
1598        let now_after_first_lock_expiry = sim_clock.now();
1599        {
1600            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1601            let cleanup_progress = executor
1602                .tick(now_after_first_lock_expiry, RunId::generate())
1603                .await
1604                .unwrap();
1605            assert!(cleanup_progress.executions.is_empty());
1606        }
1607        {
1608            let expired_locks = expired_timers_watcher::tick(
1609                db_pool.connection().as_ref(),
1610                now_after_first_lock_expiry,
1611            )
1612            .await
1613            .unwrap()
1614            .expired_locks;
1615            assert_eq!(1, expired_locks);
1616        }
1617        assert!(
1618            !first_execution_progress
1619                .executions
1620                .pop()
1621                .unwrap()
1622                .1
1623                .is_finished()
1624        );
1625
1626        let execution_log = db_connection.get(&execution_id).await.unwrap();
1627        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1628        assert_matches!(
1629            &execution_log.events.get(2).unwrap(),
1630            ExecutionEvent {
1631                event: ExecutionRequest::TemporarilyTimedOut { backoff_expires_at, .. },
1632                created_at: at,
1633                backtrace_id: None,
1634                version: Version(2),
1635            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1636        );
1637        assert_matches!(
1638            execution_log.pending_state,
1639            PendingState::PendingAt {
1640                scheduled_at: found_scheduled_by,
1641                last_lock: Some(LockedBy {
1642                    executor_id: found_executor_id,
1643                    run_id: _,
1644                }),
1645                component_id_input_digest: _
1646            } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1647        );
1648        sim_clock.move_time_forward(timeout_duration);
1649        let now_after_first_timeout = sim_clock.now();
1650        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1651
1652        let mut second_execution_progress = executor
1653            .tick(now_after_first_timeout, RunId::generate())
1654            .await
1655            .unwrap();
1656        assert_eq!(1, second_execution_progress.executions.len());
1657
1658        // Started hanging, wait for lock expiry.
1659        sim_clock.move_time_forward(lock_expiry);
1660        // cleanup should be called
1661        let now_after_second_lock_expiry = sim_clock.now();
1662        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1663        {
1664            let cleanup_progress = executor
1665                .tick(now_after_second_lock_expiry, RunId::generate())
1666                .await
1667                .unwrap();
1668            assert!(cleanup_progress.executions.is_empty());
1669        }
1670        {
1671            let expired_locks = expired_timers_watcher::tick(
1672                db_pool.connection().as_ref(),
1673                now_after_second_lock_expiry,
1674            )
1675            .await
1676            .unwrap()
1677            .expired_locks;
1678            assert_eq!(1, expired_locks);
1679        }
1680        assert!(
1681            !second_execution_progress
1682                .executions
1683                .pop()
1684                .unwrap()
1685                .1
1686                .is_finished()
1687        );
1688
1689        drop(db_connection);
1690        drop(executor);
1691        db_close.close().await;
1692    }
1693}