obeli_sk_executor/
executor.rs

1use crate::worker::{FatalError, Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, DbPool, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11    ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16    FinishedExecutionError,
17    storage::{ExecutionRequest, Version},
18};
19use std::{
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31    pub lock_expiry: Duration,
32    pub tick_sleep: Duration,
33    pub batch_size: u32,
34    pub component_id: ComponentId,
35    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36    pub executor_id: ExecutorId,
37    pub retry_config: ComponentRetryConfig,
38    pub locking_strategy: LockingStrategy,
39}
40
41pub struct ExecTask<C: ClockFn> {
42    worker: Arc<dyn Worker>,
43    config: ExecConfig,
44    clock_fn: C, // Used for obtaining current time when the execution finishes.
45    db_pool: Arc<dyn DbPool>,
46    locking_strategy_holder: LockingStrategyHolder,
47}
48
49#[derive(derive_more::Debug, Default)]
50pub struct ExecutionProgress {
51    #[debug(skip)]
52    #[allow(dead_code)]
53    executions: Vec<(ExecutionId, JoinHandle<()>)>,
54}
55
56impl ExecutionProgress {
57    #[cfg(feature = "test")]
58    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
59        let mut vec = Vec::new();
60        for (exe, join_handle) in self.executions {
61            vec.push(exe);
62            join_handle.await.unwrap();
63        }
64        vec
65    }
66}
67
68#[derive(derive_more::Debug)]
69pub struct ExecutorTaskHandle {
70    #[debug(skip)]
71    is_closing: Arc<AtomicBool>,
72    #[debug(skip)]
73    abort_handle: AbortHandle,
74    component_id: ComponentId,
75    executor_id: ExecutorId,
76}
77
78impl ExecutorTaskHandle {
79    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
80    pub async fn close(&self) {
81        trace!("Gracefully closing");
82        // TODO: Close all the workers tasks as well.
83        // Simple solution:
84        // Attempt to unlock all in-progress executions. Activities will not be penalized for shutdown, although the number of retries would increase,
85        // some progress will be lost.
86        // More complex solution:
87        // Activities will be awaited, up to some deadline, then unlocked.
88        // Workflows that are awating should be signaled using `DeadlineTracker`.
89        self.is_closing.store(true, Ordering::Relaxed);
90        while !self.abort_handle.is_finished() {
91            tokio::time::sleep(Duration::from_millis(1)).await;
92        }
93        debug!("Gracefully closed");
94    }
95}
96
97impl Drop for ExecutorTaskHandle {
98    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
99    fn drop(&mut self) {
100        if self.abort_handle.is_finished() {
101            return;
102        }
103        warn!("Aborting the executor task");
104        self.abort_handle.abort();
105    }
106}
107
108#[cfg(feature = "test")]
109pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
110    extract_exported_ffqns_noext(worker)
111}
112
113fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
114    worker
115        .exported_functions()
116        .iter()
117        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
118        .collect::<Arc<_>>()
119}
120
121#[derive(Debug, Default, Clone, Copy)]
122pub enum LockingStrategy {
123    #[default]
124    ByFfqns,
125    ByComponentId,
126}
127impl LockingStrategy {
128    fn holder(&self, ffqns: Arc<[FunctionFqn]>) -> LockingStrategyHolder {
129        match self {
130            LockingStrategy::ByFfqns => LockingStrategyHolder::ByFfqns(ffqns),
131            LockingStrategy::ByComponentId => LockingStrategyHolder::ByComponentId,
132        }
133    }
134}
135
136enum LockingStrategyHolder {
137    ByFfqns(Arc<[FunctionFqn]>),
138    ByComponentId,
139}
140
141impl<C: ClockFn + 'static> ExecTask<C> {
142    #[cfg(feature = "test")]
143    pub fn new_test(
144        worker: Arc<dyn Worker>,
145        config: ExecConfig,
146        clock_fn: C,
147        db_pool: Arc<dyn DbPool>,
148        ffqns: Arc<[FunctionFqn]>,
149    ) -> Self {
150        Self {
151            worker,
152            locking_strategy_holder: config.locking_strategy.holder(ffqns),
153            config,
154            clock_fn,
155            db_pool,
156        }
157    }
158
159    #[cfg(feature = "test")]
160    pub fn new_all_ffqns_test(
161        worker: Arc<dyn Worker>,
162        config: ExecConfig,
163        clock_fn: C,
164        db_pool: Arc<dyn DbPool>,
165    ) -> Self {
166        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167        Self {
168            worker,
169            locking_strategy_holder: config.locking_strategy.holder(ffqns),
170            config,
171            clock_fn,
172            db_pool,
173        }
174    }
175
176    pub fn spawn_new(
177        worker: Arc<dyn Worker>,
178        config: ExecConfig,
179        clock_fn: C,
180        db_pool: Arc<dyn DbPool>,
181        sleep: impl Sleep + 'static,
182    ) -> ExecutorTaskHandle {
183        let is_closing = Arc::new(AtomicBool::default());
184        let is_closing_inner = is_closing.clone();
185        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
186        let component_id = config.component_id.clone();
187        let executor_id = config.executor_id;
188        let abort_handle = tokio::spawn(async move {
189            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
190            let lock_strategy_holder = config.locking_strategy.holder(ffqns);
191            let task = ExecTask {
192                worker,
193                config,
194                db_pool,
195                locking_strategy_holder: lock_strategy_holder,
196                clock_fn: clock_fn.clone(),
197            };
198            let mut old_err = None;
199            while !is_closing_inner.load(Ordering::Relaxed) {
200                let res = task.db_pool.db_exec_conn().await;
201                let res = log_err_if_new(res, &mut old_err);
202                if let Ok(db_exec) = res {
203                    let _ = task.tick(db_exec.as_ref(), clock_fn.now(), RunId::generate()).await;
204                    db_exec
205                        .wait_for_pending_by_component_id(clock_fn.now(), &task.config.component_id, {
206                            let sleep = sleep.clone();
207                            Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
208                        .await;
209                } else  {
210                    sleep.sleep(task.config.tick_sleep).await;
211                }
212            }
213        })
214        .abort_handle();
215        ExecutorTaskHandle {
216            is_closing,
217            abort_handle,
218            component_id,
219            executor_id,
220        }
221    }
222
223    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
224        if let Some(task_limiter) = &self.config.task_limiter {
225            let mut locks = Vec::new();
226            for _ in 0..self.config.batch_size {
227                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
228                    locks.push(Some(permit));
229                } else {
230                    break;
231                }
232            }
233            locks
234        } else {
235            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
236            for _ in 0..self.config.batch_size {
237                vec.push(None);
238            }
239            vec
240        }
241    }
242
243    #[cfg(feature = "test")]
244    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
245        let db_exec = self.db_pool.db_exec_conn().await.unwrap();
246        self.tick(db_exec.as_ref(), executed_at, run_id)
247            .await
248            .unwrap()
249    }
250
251    #[cfg(feature = "test")]
252    pub async fn tick_test_await(
253        &self,
254        executed_at: DateTime<Utc>,
255        run_id: RunId,
256    ) -> Vec<ExecutionId> {
257        let db_exec = self.db_pool.db_exec_conn().await.unwrap();
258        self.tick(db_exec.as_ref(), executed_at, run_id)
259            .await
260            .unwrap()
261            .wait_for_tasks()
262            .await
263    }
264
265    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
266    async fn tick(
267        &self,
268        db_exec: &dyn DbExecutor,
269        executed_at: DateTime<Utc>,
270        run_id: RunId,
271    ) -> Result<ExecutionProgress, DbErrorGeneric> {
272        let locked_executions = {
273            let mut permits = self.acquire_task_permits();
274            if permits.is_empty() {
275                return Ok(ExecutionProgress::default());
276            }
277            let lock_expires_at = executed_at + self.config.lock_expiry;
278            let batch_size = u32::try_from(permits.len()).expect("ExecConfig.batch_size is u32");
279            let locked_executions = match &self.locking_strategy_holder {
280                LockingStrategyHolder::ByFfqns(ffqns) => {
281                    db_exec
282                        .lock_pending_by_ffqns(
283                            batch_size,
284                            executed_at, // fetch expiring before now
285                            ffqns.clone(),
286                            executed_at, // created at
287                            self.config.component_id.clone(),
288                            self.config.executor_id,
289                            lock_expires_at,
290                            run_id,
291                            self.config.retry_config,
292                        )
293                        .await?
294                }
295                LockingStrategyHolder::ByComponentId => {
296                    db_exec
297                        .lock_pending_by_component_id(
298                            batch_size,
299                            executed_at, // pending_at_or_sooner
300                            &self.config.component_id,
301                            executed_at, // created at
302                            self.config.executor_id,
303                            lock_expires_at,
304                            run_id,
305                            self.config.retry_config,
306                        )
307                        .await?
308                }
309            };
310            // Drop permits if too many were allocated.
311            while permits.len() > locked_executions.len() {
312                permits.pop();
313            }
314            assert_eq!(permits.len(), locked_executions.len());
315            locked_executions.into_iter().zip(permits)
316        };
317
318        let mut executions = Vec::with_capacity(locked_executions.len());
319        for (locked_execution, permit) in locked_executions {
320            let execution_id = locked_execution.execution_id.clone();
321            let join_handle = {
322                let worker = self.worker.clone();
323                let db_pool = self.db_pool.clone();
324                let clock_fn = self.clock_fn.clone();
325                let worker_span = info_span!(parent: None, "worker",
326                    "otel.name" = format!("worker {}", locked_execution.ffqn),
327                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
328                locked_execution.metadata.enrich(&worker_span);
329                tokio::spawn({
330                    let worker_span2 = worker_span.clone();
331                    let retry_config = self.config.retry_config;
332                    async move {
333                        let _permit = permit;
334                        let res = Self::run_worker(
335                            worker,
336                            db_pool.as_ref(),
337                            clock_fn,
338                            locked_execution,
339                            retry_config,
340                            worker_span2,
341                        )
342                        .await;
343                        if let Err(db_error) = res {
344                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
345                        }
346                    }
347                    .instrument(worker_span)
348                })
349            };
350            executions.push((execution_id, join_handle));
351        }
352        Ok(ExecutionProgress { executions })
353    }
354
355    async fn run_worker(
356        worker: Arc<dyn Worker>,
357        db_pool: &dyn DbPool,
358        clock_fn: C,
359        locked_execution: LockedExecution,
360        retry_config: ComponentRetryConfig,
361        worker_span: Span,
362    ) -> Result<(), DbErrorWrite> {
363        debug!("Worker::run starting");
364        trace!(
365            version = %locked_execution.next_version,
366            params = ?locked_execution.params,
367            event_history = ?locked_execution.event_history,
368            "Worker::run starting"
369        );
370        let can_be_retried = ExecutionLog::can_be_retried_after(
371            locked_execution.intermittent_event_count + 1,
372            retry_config.max_retries,
373            retry_config.retry_exp_backoff,
374        );
375        let unlock_expiry_on_limit_reached =
376            ExecutionLog::compute_retry_duration_when_retrying_forever(
377                locked_execution.intermittent_event_count + 1,
378                retry_config.retry_exp_backoff,
379            );
380        let ctx = WorkerContext {
381            execution_id: locked_execution.execution_id.clone(),
382            metadata: locked_execution.metadata,
383            ffqn: locked_execution.ffqn,
384            params: locked_execution.params,
385            event_history: locked_execution.event_history,
386            responses: locked_execution
387                .responses
388                .into_iter()
389                .map(|outer| outer.event)
390                .collect(),
391            version: locked_execution.next_version,
392            can_be_retried: can_be_retried.is_some(),
393            locked_event: locked_execution.locked_event,
394            worker_span,
395        };
396        let worker_result = worker.run(ctx).await;
397        trace!(?worker_result, "Worker::run finished");
398        let result_obtained_at = clock_fn.now();
399        match Self::worker_result_to_execution_event(
400            locked_execution.execution_id,
401            worker_result,
402            result_obtained_at,
403            locked_execution.parent,
404            can_be_retried,
405            unlock_expiry_on_limit_reached,
406        )? {
407            Some(append) => {
408                trace!("Appending {append:?}");
409                let db_exec = db_pool.db_exec_conn().await?;
410                match append {
411                    AppendOrCancel::Cancel {
412                        execution_id,
413                        cancelled_at,
414                    } => db_exec
415                        .cancel_activity_with_retries(&execution_id, cancelled_at)
416                        .await
417                        .map(|_| ()),
418                    AppendOrCancel::Other(append) => append.append(db_exec.as_ref()).await,
419                }
420            }
421            None => Ok(()),
422        }
423    }
424
425    /// Map the `WorkerError` to an temporary or a permanent failure.
426    fn worker_result_to_execution_event(
427        execution_id: ExecutionId,
428        worker_result: WorkerResult,
429        result_obtained_at: DateTime<Utc>,
430        parent: Option<(ExecutionId, JoinSetId)>,
431        can_be_retried: Option<Duration>,
432        unlock_expiry_on_limit_reached: Duration,
433    ) -> Result<Option<AppendOrCancel>, DbErrorWrite> {
434        Ok(match worker_result {
435            WorkerResult::Ok(result, new_version, http_client_traces) => {
436                info!("Execution finished: {result}");
437                let child_finished =
438                    parent.map(
439                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
440                            parent_execution_id,
441                            parent_join_set,
442                            result: result.clone(),
443                        },
444                    );
445                let primary_event = AppendRequest {
446                    created_at: result_obtained_at,
447                    event: ExecutionRequest::Finished {
448                        result,
449                        http_client_traces,
450                    },
451                };
452
453                Some(AppendOrCancel::Other(Append {
454                    created_at: result_obtained_at,
455                    primary_event,
456                    execution_id,
457                    version: new_version,
458                    child_finished,
459                }))
460            }
461            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
462            WorkerResult::Err(err) => {
463                let reason_generic = err.to_string(); // Override with err's reason if no information is lost.
464
465                let (primary_event, child_finished, version) = match err {
466                    WorkerError::TemporaryTimeout {
467                        http_client_traces,
468                        version,
469                    } => {
470                        if let Some(duration) = can_be_retried {
471                            let backoff_expires_at = result_obtained_at + duration;
472                            info!(
473                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
474                            );
475                            (
476                                ExecutionRequest::TemporarilyTimedOut {
477                                    backoff_expires_at,
478                                    http_client_traces,
479                                },
480                                None,
481                                version,
482                            )
483                        } else {
484                            info!("Execution timed out");
485                            let result = SupportedFunctionReturnValue::ExecutionError(
486                                FinishedExecutionError {
487                                    kind: ExecutionFailureKind::TimedOut,
488                                    reason: None,
489                                    detail: None,
490                                },
491                            );
492                            let child_finished =
493                                parent.map(|(parent_execution_id, parent_join_set)| {
494                                    ChildFinishedResponse {
495                                        parent_execution_id,
496                                        parent_join_set,
497                                        result: result.clone(),
498                                    }
499                                });
500                            (
501                                ExecutionRequest::Finished {
502                                    result,
503                                    http_client_traces,
504                                },
505                                child_finished,
506                                version,
507                            )
508                        }
509                    }
510                    WorkerError::DbError(db_error) => {
511                        return Err(db_error);
512                    }
513                    WorkerError::ActivityTrap {
514                        reason: _, // reason_generic contains trap_kind + reason
515                        trap_kind,
516                        detail,
517                        version,
518                        http_client_traces,
519                    } => {
520                        if let Some(duration) = can_be_retried {
521                            let expires_at = result_obtained_at + duration;
522                            debug!(
523                                "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
524                            );
525                            (
526                                ExecutionRequest::TemporarilyFailed {
527                                    reason: StrVariant::from(reason_generic),
528                                    backoff_expires_at: expires_at,
529                                    detail,
530                                    http_client_traces,
531                                },
532                                None,
533                                version,
534                            )
535                        } else {
536                            info!(
537                                "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
538                            );
539                            let result = SupportedFunctionReturnValue::ExecutionError(
540                                FinishedExecutionError {
541                                    reason: Some(reason_generic),
542                                    kind: ExecutionFailureKind::Uncategorized,
543                                    detail,
544                                },
545                            );
546                            let child_finished =
547                                parent.map(|(parent_execution_id, parent_join_set)| {
548                                    ChildFinishedResponse {
549                                        parent_execution_id,
550                                        parent_join_set,
551                                        result: result.clone(),
552                                    }
553                                });
554                            (
555                                ExecutionRequest::Finished {
556                                    result,
557                                    http_client_traces,
558                                },
559                                child_finished,
560                                version,
561                            )
562                        }
563                    }
564                    WorkerError::ActivityPreopenedDirError {
565                        reason,
566                        detail,
567                        version,
568                    } => {
569                        let http_client_traces = None;
570                        if let Some(duration) = can_be_retried {
571                            let expires_at = result_obtained_at + duration;
572                            debug!(
573                                "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
574                            );
575                            (
576                                ExecutionRequest::TemporarilyFailed {
577                                    reason: StrVariant::from(reason_generic),
578                                    backoff_expires_at: expires_at,
579                                    detail: Some(detail),
580                                    http_client_traces,
581                                },
582                                None,
583                                version,
584                            )
585                        } else {
586                            info!(
587                                "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
588                            );
589                            let result = SupportedFunctionReturnValue::ExecutionError(
590                                FinishedExecutionError {
591                                    reason: Some(reason_generic),
592                                    kind: ExecutionFailureKind::Uncategorized,
593                                    detail: Some(detail),
594                                },
595                            );
596                            let child_finished =
597                                parent.map(|(parent_execution_id, parent_join_set)| {
598                                    ChildFinishedResponse {
599                                        parent_execution_id,
600                                        parent_join_set,
601                                        result: result.clone(),
602                                    }
603                                });
604                            (
605                                ExecutionRequest::Finished {
606                                    result,
607                                    http_client_traces,
608                                },
609                                child_finished,
610                                version,
611                            )
612                        }
613                    }
614                    WorkerError::ActivityReturnedError {
615                        detail,
616                        version,
617                        http_client_traces,
618                    } => {
619                        let duration = can_be_retried.expect(
620                            "ActivityReturnedError must not be returned when retries are exhausted",
621                        );
622                        let expires_at = result_obtained_at + duration;
623                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
624                        (
625                            ExecutionRequest::TemporarilyFailed {
626                                backoff_expires_at: expires_at,
627                                reason: StrVariant::Static("activity returned error"), // is same as the variant's display message.
628                                detail, // contains the backtrace
629                                http_client_traces,
630                            },
631                            None,
632                            version,
633                        )
634                    }
635                    WorkerError::LimitReached {
636                        reason,
637                        version: new_version,
638                    } => {
639                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
640                        warn!(
641                            "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
642                        );
643                        (
644                            ExecutionRequest::Unlocked {
645                                backoff_expires_at: expires_at,
646                                reason: StrVariant::from(reason),
647                            },
648                            None,
649                            new_version,
650                        )
651                    }
652                    WorkerError::FatalError(FatalError::Cancelled, _version) => {
653                        // use more optimized `cancel` to avoid polluting the logs with errors on a race.
654                        return Ok(Some(AppendOrCancel::Cancel {
655                            execution_id,
656                            cancelled_at: result_obtained_at,
657                        }));
658                    }
659                    WorkerError::FatalError(fatal_error, version) => {
660                        warn!("Fatal worker error - {fatal_error:?}");
661                        let result = SupportedFunctionReturnValue::ExecutionError(
662                            FinishedExecutionError::from(fatal_error),
663                        );
664                        let child_finished =
665                            parent.map(|(parent_execution_id, parent_join_set)| {
666                                ChildFinishedResponse {
667                                    parent_execution_id,
668                                    parent_join_set,
669                                    result: result.clone(),
670                                }
671                            });
672                        (
673                            ExecutionRequest::Finished {
674                                result,
675                                http_client_traces: None,
676                            },
677                            child_finished,
678                            version,
679                        )
680                    }
681                };
682                Some(AppendOrCancel::Other(Append {
683                    created_at: result_obtained_at,
684                    primary_event: AppendRequest {
685                        created_at: result_obtained_at,
686                        event: primary_event,
687                    },
688                    execution_id,
689                    version,
690                    child_finished,
691                }))
692            }
693        })
694    }
695}
696
697#[derive(Debug, Clone)]
698pub(crate) struct ChildFinishedResponse {
699    pub(crate) parent_execution_id: ExecutionId,
700    pub(crate) parent_join_set: JoinSetId,
701    pub(crate) result: SupportedFunctionReturnValue,
702}
703
704#[derive(Debug, Clone)]
705#[expect(clippy::large_enum_variant)]
706pub(crate) enum AppendOrCancel {
707    Cancel {
708        execution_id: ExecutionId,
709        cancelled_at: DateTime<Utc>,
710    },
711    Other(Append),
712}
713
714#[derive(Debug, Clone)]
715pub(crate) struct Append {
716    pub(crate) created_at: DateTime<Utc>,
717    pub(crate) primary_event: AppendRequest,
718    pub(crate) execution_id: ExecutionId,
719    pub(crate) version: Version,
720    pub(crate) child_finished: Option<ChildFinishedResponse>,
721}
722
723impl Append {
724    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
725        if let Some(child_finished) = self.child_finished {
726            assert_matches!(
727                &self.primary_event,
728                AppendRequest {
729                    event: ExecutionRequest::Finished { .. },
730                    ..
731                }
732            );
733            let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
734            let events = AppendEventsToExecution {
735                execution_id: self.execution_id,
736                version: self.version.clone(),
737                batch: vec![self.primary_event],
738            };
739            let response = AppendResponseToExecution {
740                parent_execution_id: child_finished.parent_execution_id,
741                created_at: self.created_at,
742                join_set_id: child_finished.parent_join_set,
743                child_execution_id,
744                finished_version: self.version, // Since self.primary_event is a finished event, the version will remain the same.
745                result: child_finished.result,
746            };
747
748            db_exec
749                .append_batch_respond_to_parent(events, response, self.created_at)
750                .await?;
751        } else {
752            db_exec
753                .append(self.execution_id, self.version, self.primary_event)
754                .await?;
755        }
756        Ok(())
757    }
758}
759
760fn log_err_if_new<T>(
761    res: Result<T, DbErrorGeneric>,
762    old_err: &mut Option<DbErrorGeneric>,
763) -> Result<T, ()> {
764    match (res, &old_err) {
765        (Ok(ok), _) => {
766            *old_err = None;
767            Ok(ok)
768        }
769        (Err(err), Some(old)) if err == *old => Err(()),
770        (Err(err), _) => {
771            warn!("Tick failed: {err:?}");
772            *old_err = Some(err);
773            Err(())
774        }
775    }
776}
777
778#[cfg(any(test, feature = "test"))]
779pub mod simple_worker {
780    use crate::worker::{Worker, WorkerContext, WorkerResult};
781    use async_trait::async_trait;
782    use concepts::{
783        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
784        storage::{HistoryEvent, Version},
785    };
786    use indexmap::IndexMap;
787    use std::sync::Arc;
788    use tracing::trace;
789
790    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
791    pub type SimpleWorkerResultMap =
792        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
793
794    #[derive(Clone, Debug)]
795    pub struct SimpleWorker {
796        pub worker_results_rev: SimpleWorkerResultMap,
797        pub ffqn: FunctionFqn,
798        exported: [FunctionMetadata; 1],
799    }
800
801    impl SimpleWorker {
802        #[must_use]
803        pub fn with_single_result(res: WorkerResult) -> Self {
804            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
805                Version::new(2),
806                (vec![], res),
807            )]))))
808        }
809
810        #[must_use]
811        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
812            Self {
813                worker_results_rev: self.worker_results_rev,
814                exported: [FunctionMetadata {
815                    ffqn: ffqn.clone(),
816                    parameter_types: ParameterTypes::default(),
817                    return_type: RETURN_TYPE_DUMMY,
818                    extension: None,
819                    submittable: true,
820                }],
821                ffqn,
822            }
823        }
824
825        #[must_use]
826        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
827            Self {
828                worker_results_rev,
829                ffqn: FFQN_SOME,
830                exported: [FunctionMetadata {
831                    ffqn: FFQN_SOME,
832                    parameter_types: ParameterTypes::default(),
833                    return_type: RETURN_TYPE_DUMMY,
834                    extension: None,
835                    submittable: true,
836                }],
837            }
838        }
839    }
840
841    #[async_trait]
842    impl Worker for SimpleWorker {
843        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
844            let (expected_version, (expected_eh, worker_result)) =
845                self.worker_results_rev.lock().unwrap().pop().unwrap();
846            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
847            assert_eq!(expected_version, ctx.version);
848            assert_eq!(
849                expected_eh,
850                ctx.event_history
851                    .iter()
852                    .map(|(event, _version)| event.clone())
853                    .collect::<Vec<_>>()
854            );
855            worker_result
856        }
857
858        fn exported_functions(&self) -> &[FunctionMetadata] {
859            &self.exported
860        }
861    }
862}
863
864#[cfg(test)]
865mod tests {
866    use self::simple_worker::SimpleWorker;
867    use super::*;
868    use crate::{expired_timers_watcher, worker::WorkerResult};
869    use assert_matches::assert_matches;
870    use async_trait::async_trait;
871    use concepts::storage::{
872        CreateRequest, DbConnectionTest, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
873    };
874    use concepts::storage::{DbPoolCloseable, LockedBy};
875    use concepts::storage::{ExecutionEvent, ExecutionRequest, HistoryEvent, PendingState};
876    use concepts::time::Now;
877    use concepts::{
878        FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
879        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
880    };
881    use db_tests::Database;
882    use indexmap::IndexMap;
883    use rstest::rstest;
884    use simple_worker::FFQN_SOME;
885    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
886    use test_db_macro::expand_enum_database;
887    use test_utils::set_up;
888    use test_utils::sim_clock::{ConstClock, SimClock};
889
890    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
891
892    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
893        config: ExecConfig,
894        clock_fn: C,
895        db_pool: Arc<dyn DbPool>,
896        worker: Arc<W>,
897        executed_at: DateTime<Utc>,
898    ) -> Vec<ExecutionId> {
899        trace!("Ticking with {worker:?}");
900        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
901        let executor = ExecTask::new_test(worker, config, clock_fn, db_pool, ffqns);
902        executor
903            .tick_test_await(executed_at, RunId::generate())
904            .await
905    }
906
907    #[expand_enum_database]
908    #[rstest]
909    #[tokio::test]
910    async fn execute_simple_lifecycle_tick_based(
911        database: Database,
912        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentId)]
913        locking_strategy: LockingStrategy,
914    ) {
915        set_up();
916        let created_at = Now.now();
917        let (_guard, db_pool, db_close) = database.set_up().await;
918        let db_connection = db_pool.connection_test().await.unwrap();
919        execute_simple_lifecycle_tick_based_inner(
920            db_connection.as_ref(),
921            db_pool.clone(),
922            ConstClock(created_at),
923            locking_strategy,
924        )
925        .await;
926        drop(db_connection);
927        db_close.close().await;
928    }
929
930    async fn execute_simple_lifecycle_tick_based_inner<C: ClockFn + 'static>(
931        db_connection: &dyn DbConnectionTest,
932        db_pool: Arc<dyn DbPool>,
933        clock_fn: C,
934        locking_strategy: LockingStrategy,
935    ) {
936        let created_at = clock_fn.now();
937        let exec_config = ExecConfig {
938            batch_size: 1,
939            lock_expiry: Duration::from_secs(1),
940            tick_sleep: Duration::from_millis(100),
941            component_id: ComponentId::dummy_activity(),
942            task_limiter: None,
943            executor_id: ExecutorId::generate(),
944            retry_config: ComponentRetryConfig::ZERO,
945            locking_strategy,
946        };
947
948        let execution_log = create_and_tick(
949            CreateAndTickConfig {
950                execution_id: ExecutionId::generate(),
951                created_at,
952                executed_at: created_at,
953            },
954            clock_fn,
955            db_connection,
956            db_pool,
957            exec_config,
958            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
959                SUPPORTED_RETURN_VALUE_OK_EMPTY,
960                Version::new(2),
961                None,
962            ))),
963            tick_fn,
964        )
965        .await;
966        assert_matches!(
967            execution_log.events.get(2).unwrap(),
968            ExecutionEvent {
969                event: ExecutionRequest::Finished {
970                    result: SupportedFunctionReturnValue::Ok { ok: None },
971                    http_client_traces: None
972                },
973                created_at: _,
974                backtrace_id: None,
975                version: Version(2),
976            }
977        );
978    }
979
980    #[tokio::test]
981    async fn execute_simple_lifecycle_task_based_mem() {
982        set_up();
983        let created_at = Now.now();
984        let clock_fn = ConstClock(created_at);
985        let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
986        let exec_config = ExecConfig {
987            batch_size: 1,
988            lock_expiry: Duration::from_secs(1),
989            tick_sleep: Duration::ZERO,
990            component_id: ComponentId::dummy_activity(),
991            task_limiter: None,
992            executor_id: ExecutorId::generate(),
993            retry_config: ComponentRetryConfig::ZERO,
994            locking_strategy: LockingStrategy::default(),
995        };
996
997        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
998            SUPPORTED_RETURN_VALUE_OK_EMPTY,
999            Version::new(2),
1000            None,
1001        )));
1002        let db_connection = db_pool.connection_test().await.unwrap();
1003
1004        let execution_log = create_and_tick(
1005            CreateAndTickConfig {
1006                execution_id: ExecutionId::generate(),
1007                created_at,
1008                executed_at: created_at,
1009            },
1010            clock_fn,
1011            db_connection.as_ref(),
1012            db_pool,
1013            exec_config,
1014            worker,
1015            tick_fn,
1016        )
1017        .await;
1018        assert_matches!(
1019            execution_log.events.get(2).unwrap(),
1020            ExecutionEvent {
1021                event: ExecutionRequest::Finished {
1022                    result: SupportedFunctionReturnValue::Ok { ok: None },
1023                    http_client_traces: None
1024                },
1025                created_at: _,
1026                backtrace_id: None,
1027                version: Version(2),
1028            }
1029        );
1030        db_close.close().await;
1031    }
1032
1033    struct CreateAndTickConfig {
1034        execution_id: ExecutionId,
1035        created_at: DateTime<Utc>,
1036        executed_at: DateTime<Utc>,
1037    }
1038
1039    async fn create_and_tick<
1040        W: Worker,
1041        C: ClockFn,
1042        T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
1043        F: Future<Output = Vec<ExecutionId>>,
1044    >(
1045        config: CreateAndTickConfig,
1046        clock_fn: C,
1047        db_connection: &dyn DbConnectionTest,
1048        db_pool: Arc<dyn DbPool>,
1049        exec_config: ExecConfig,
1050        worker: Arc<W>,
1051        mut tick: T,
1052    ) -> ExecutionLog {
1053        // Create an execution
1054        db_connection
1055            .create(CreateRequest {
1056                created_at: config.created_at,
1057                execution_id: config.execution_id.clone(),
1058                ffqn: FFQN_SOME,
1059                params: Params::empty(),
1060                parent: None,
1061                metadata: concepts::ExecutionMetadata::empty(),
1062                scheduled_at: config.created_at,
1063                component_id: ComponentId::dummy_activity(),
1064                scheduled_by: None,
1065            })
1066            .await
1067            .unwrap();
1068        // execute!
1069        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
1070        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
1071        debug!("Execution history after tick: {execution_log:?}");
1072        // check that DB contains Created and Locked events.
1073        let actually_created_at = assert_matches!(
1074            execution_log.events.first().unwrap(),
1075            ExecutionEvent {
1076                event: ExecutionRequest::Created { .. },
1077                created_at: actually_created_at,
1078                backtrace_id: None,
1079                version: Version(0),
1080            }
1081            => *actually_created_at
1082        );
1083        assert_eq!(config.created_at, actually_created_at);
1084        let locked_at = assert_matches!(
1085            execution_log.events.get(1).unwrap(),
1086            ExecutionEvent {
1087                event: ExecutionRequest::Locked { .. },
1088                created_at: locked_at,
1089                backtrace_id: None,
1090                version: Version(1),
1091            } if config.created_at <= *locked_at
1092            => *locked_at
1093        );
1094        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
1095            event: _,
1096            created_at: executed_at,
1097            backtrace_id: None,
1098            version: Version(2),
1099        } if *executed_at >= locked_at);
1100        execution_log
1101    }
1102
1103    #[tokio::test]
1104    async fn activity_trap_should_trigger_an_execution_retry() {
1105        set_up();
1106        let sim_clock = SimClock::default();
1107        let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1108        let retry_exp_backoff = Duration::from_millis(100);
1109        let retry_config = ComponentRetryConfig {
1110            max_retries: Some(1),
1111            retry_exp_backoff,
1112        };
1113        let exec_config = ExecConfig {
1114            batch_size: 1,
1115            lock_expiry: Duration::from_secs(1),
1116            tick_sleep: Duration::ZERO,
1117            component_id: ComponentId::dummy_activity(),
1118            task_limiter: None,
1119            executor_id: ExecutorId::generate(),
1120            retry_config,
1121            locking_strategy: LockingStrategy::default(),
1122        };
1123        let expected_reason = "error reason";
1124        let expected_detail = "error detail";
1125        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1126            WorkerError::ActivityTrap {
1127                reason: expected_reason.to_string(),
1128                trap_kind: concepts::TrapKind::Trap,
1129                detail: Some(expected_detail.to_string()),
1130                version: Version::new(2),
1131                http_client_traces: None,
1132            },
1133        )));
1134        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1135        let db_connection = db_pool.connection_test().await.unwrap();
1136        let execution_log = create_and_tick(
1137            CreateAndTickConfig {
1138                execution_id: ExecutionId::generate(),
1139                created_at: sim_clock.now(),
1140                executed_at: sim_clock.now(),
1141            },
1142            sim_clock.clone(),
1143            db_connection.as_ref(),
1144            db_pool.clone(),
1145            exec_config.clone(),
1146            worker,
1147            tick_fn,
1148        )
1149        .await;
1150        assert_eq!(3, execution_log.events.len());
1151        {
1152            let (reason, detail, at, expires_at) = assert_matches!(
1153                &execution_log.events.get(2).unwrap(),
1154                ExecutionEvent {
1155                    event: ExecutionRequest::TemporarilyFailed {
1156                        reason,
1157                        detail,
1158                        backoff_expires_at,
1159                        http_client_traces: None,
1160                    },
1161                    created_at: at,
1162                    backtrace_id: None,
1163                    version: Version(2),
1164                }
1165                => (reason, detail, *at, *backoff_expires_at)
1166            );
1167            assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1168            assert_eq!(Some(expected_detail), detail.as_deref());
1169            assert_eq!(at, sim_clock.now());
1170            assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1171        }
1172        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1173            std::sync::Mutex::new(IndexMap::from([(
1174                Version::new(4),
1175                (
1176                    vec![],
1177                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1178                ),
1179            )])),
1180        )));
1181        // noop until `retry_exp_backoff` expires
1182        assert!(
1183            tick_fn(
1184                exec_config.clone(),
1185                sim_clock.clone(),
1186                db_pool.clone(),
1187                worker.clone(),
1188                sim_clock.now(),
1189            )
1190            .await
1191            .is_empty()
1192        );
1193        // tick again to finish the execution
1194        sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1195        tick_fn(
1196            exec_config,
1197            sim_clock.clone(),
1198            db_pool.clone(),
1199            worker,
1200            sim_clock.now(),
1201        )
1202        .await;
1203        let execution_log = {
1204            let db_connection = db_pool.connection_test().await.unwrap();
1205            db_connection
1206                .get(&execution_log.execution_id)
1207                .await
1208                .unwrap()
1209        };
1210        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1211        assert_matches!(
1212            execution_log.events.get(3).unwrap(),
1213            ExecutionEvent {
1214                event: ExecutionRequest::Locked { .. },
1215                created_at: at,
1216                backtrace_id: None,
1217                version: Version(3),
1218            } if *at == sim_clock.now()
1219        );
1220        assert_matches!(
1221            execution_log.events.get(4).unwrap(),
1222            ExecutionEvent {
1223                event: ExecutionRequest::Finished {
1224                    result: SupportedFunctionReturnValue::Ok{ok:None},
1225                    http_client_traces: None
1226                },
1227                created_at: finished_at,
1228                backtrace_id: None,
1229                version: Version(4),
1230            } if *finished_at == sim_clock.now()
1231        );
1232        db_close.close().await;
1233    }
1234
1235    #[tokio::test]
1236    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1237        set_up();
1238        let created_at = Now.now();
1239        let clock_fn = ConstClock(created_at);
1240        let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1241        let exec_config = ExecConfig {
1242            batch_size: 1,
1243            lock_expiry: Duration::from_secs(1),
1244            tick_sleep: Duration::ZERO,
1245            component_id: ComponentId::dummy_activity(),
1246            task_limiter: None,
1247            executor_id: ExecutorId::generate(),
1248            retry_config: ComponentRetryConfig::ZERO,
1249            locking_strategy: LockingStrategy::default(),
1250        };
1251
1252        let reason = "error reason";
1253        let expected_reason = format!("activity trap: {reason}");
1254        let expected_detail = "error detail";
1255        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1256            WorkerError::ActivityTrap {
1257                reason: reason.to_string(),
1258                trap_kind: concepts::TrapKind::Trap,
1259                detail: Some(expected_detail.to_string()),
1260                version: Version::new(2),
1261                http_client_traces: None,
1262            },
1263        )));
1264        let execution_log = create_and_tick(
1265            CreateAndTickConfig {
1266                execution_id: ExecutionId::generate(),
1267                created_at,
1268                executed_at: created_at,
1269            },
1270            clock_fn,
1271            db_pool.connection_test().await.unwrap().as_ref(),
1272            db_pool.clone(),
1273            exec_config.clone(),
1274            worker,
1275            tick_fn,
1276        )
1277        .await;
1278        assert_eq!(3, execution_log.events.len());
1279        let (reason, kind, detail) = assert_matches!(
1280            &execution_log.events.get(2).unwrap(),
1281            ExecutionEvent {
1282                event: ExecutionRequest::Finished{
1283                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1284                    http_client_traces: None
1285                },
1286                created_at: at,
1287                backtrace_id: None,
1288                version: Version(2),
1289            } if *at == created_at
1290            => (reason, kind, detail)
1291        );
1292
1293        assert_eq!(Some(expected_reason), *reason);
1294        assert_eq!(Some(expected_detail), detail.as_deref());
1295        assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1296
1297        db_close.close().await;
1298    }
1299
1300    #[tokio::test]
1301    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1302        let worker_error = WorkerError::ActivityTrap {
1303            reason: "error reason".to_string(),
1304            trap_kind: TrapKind::Trap,
1305            detail: Some("detail".to_string()),
1306            version: Version::new(2),
1307            http_client_traces: None,
1308        };
1309        let expected_child_err = FinishedExecutionError {
1310            kind: ExecutionFailureKind::Uncategorized,
1311            reason: Some("activity trap: error reason".to_string()),
1312            detail: Some("detail".to_string()),
1313        };
1314        child_execution_permanently_failed_should_notify_parent(
1315            WorkerResult::Err(worker_error),
1316            expected_child_err,
1317        )
1318        .await;
1319    }
1320
1321    #[tokio::test]
1322    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1323        let expected_child_err = FinishedExecutionError {
1324            kind: ExecutionFailureKind::TimedOut,
1325            reason: None,
1326            detail: None,
1327        };
1328        child_execution_permanently_failed_should_notify_parent(
1329            WorkerResult::DbUpdatedByWorkerOrWatcher,
1330            expected_child_err,
1331        )
1332        .await;
1333    }
1334
1335    async fn child_execution_permanently_failed_should_notify_parent(
1336        worker_result: WorkerResult,
1337        expected_child_err: FinishedExecutionError,
1338    ) {
1339        use concepts::storage::JoinSetResponseEventOuter;
1340        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1341
1342        set_up();
1343        let sim_clock = SimClock::default();
1344        let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1345
1346        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1347            WorkerResult::DbUpdatedByWorkerOrWatcher,
1348        ));
1349        let parent_execution_id = ExecutionId::generate();
1350        db_pool
1351            .connection()
1352            .await
1353            .unwrap()
1354            .create(CreateRequest {
1355                created_at: sim_clock.now(),
1356                execution_id: parent_execution_id.clone(),
1357                ffqn: FFQN_SOME,
1358                params: Params::empty(),
1359                parent: None,
1360                metadata: concepts::ExecutionMetadata::empty(),
1361                scheduled_at: sim_clock.now(),
1362                component_id: ComponentId::dummy_activity(),
1363                scheduled_by: None,
1364            })
1365            .await
1366            .unwrap();
1367        let parent_executor_id = ExecutorId::generate();
1368        tick_fn(
1369            ExecConfig {
1370                batch_size: 1,
1371                lock_expiry: LOCK_EXPIRY,
1372                tick_sleep: Duration::ZERO,
1373                component_id: ComponentId::dummy_activity(),
1374                task_limiter: None,
1375                executor_id: parent_executor_id,
1376                retry_config: ComponentRetryConfig::ZERO,
1377                locking_strategy: LockingStrategy::default(),
1378            },
1379            sim_clock.clone(),
1380            db_pool.clone(),
1381            parent_worker,
1382            sim_clock.now(),
1383        )
1384        .await;
1385
1386        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1387        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1388        // executor does not append anything, this should have been written by the worker:
1389        {
1390            let params = Params::empty();
1391            let child = CreateRequest {
1392                created_at: sim_clock.now(),
1393                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1394                ffqn: FFQN_CHILD,
1395                params: params.clone(),
1396                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1397                metadata: concepts::ExecutionMetadata::empty(),
1398                scheduled_at: sim_clock.now(),
1399                component_id: ComponentId::dummy_activity(),
1400                scheduled_by: None,
1401            };
1402            let current_time = sim_clock.now();
1403            let join_set = AppendRequest {
1404                created_at: current_time,
1405                event: ExecutionRequest::HistoryEvent {
1406                    event: HistoryEvent::JoinSetCreate {
1407                        join_set_id: join_set_id.clone(),
1408                    },
1409                },
1410            };
1411            let child_exec_req = AppendRequest {
1412                created_at: current_time,
1413                event: ExecutionRequest::HistoryEvent {
1414                    event: HistoryEvent::JoinSetRequest {
1415                        join_set_id: join_set_id.clone(),
1416                        request: JoinSetRequest::ChildExecutionRequest {
1417                            child_execution_id: child_execution_id.clone(),
1418                            target_ffqn: FFQN_CHILD,
1419                            params,
1420                        },
1421                    },
1422                },
1423            };
1424            let join_next = AppendRequest {
1425                created_at: current_time,
1426                event: ExecutionRequest::HistoryEvent {
1427                    event: HistoryEvent::JoinNext {
1428                        join_set_id: join_set_id.clone(),
1429                        run_expires_at: sim_clock.now(),
1430                        closing: false,
1431                        requested_ffqn: Some(FFQN_CHILD),
1432                    },
1433                },
1434            };
1435            db_pool
1436                .connection()
1437                .await
1438                .unwrap()
1439                .append_batch_create_new_execution(
1440                    current_time,
1441                    vec![join_set, child_exec_req, join_next],
1442                    parent_execution_id.clone(),
1443                    Version::new(2),
1444                    vec![child],
1445                )
1446                .await
1447                .unwrap();
1448        }
1449
1450        let child_worker =
1451            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1452
1453        // execute the child
1454        tick_fn(
1455            ExecConfig {
1456                batch_size: 1,
1457                lock_expiry: LOCK_EXPIRY,
1458                tick_sleep: Duration::ZERO,
1459                component_id: ComponentId::dummy_activity(),
1460                task_limiter: None,
1461                executor_id: ExecutorId::generate(),
1462                retry_config: ComponentRetryConfig::ZERO,
1463                locking_strategy: LockingStrategy::default(),
1464            },
1465            sim_clock.clone(),
1466            db_pool.clone(),
1467            child_worker,
1468            sim_clock.now(),
1469        )
1470        .await;
1471        if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1472            // In case of timeout, let the timers watcher handle it
1473            sim_clock.move_time_forward(LOCK_EXPIRY);
1474            expired_timers_watcher::tick(
1475                db_pool.connection().await.unwrap().as_ref(),
1476                sim_clock.now(),
1477            )
1478            .await
1479            .unwrap();
1480        }
1481        let child_log = db_pool
1482            .connection_test()
1483            .await
1484            .unwrap()
1485            .get(&ExecutionId::Derived(child_execution_id.clone()))
1486            .await
1487            .unwrap();
1488        assert!(child_log.pending_state.is_finished());
1489        assert_eq!(
1490            Version(2),
1491            child_log.next_version,
1492            "created = 0, locked = 1, with_single_result = 2"
1493        );
1494        assert_eq!(
1495            ExecutionRequest::Finished {
1496                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1497                http_client_traces: None
1498            },
1499            child_log.last_event().event
1500        );
1501        let parent_log = db_pool
1502            .connection_test()
1503            .await
1504            .unwrap()
1505            .get(&parent_execution_id)
1506            .await
1507            .unwrap();
1508        assert_matches!(
1509            parent_log.pending_state,
1510            PendingState::PendingAt {
1511                scheduled_at,
1512                last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1513                component_id_input_digest: _
1514            } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1515            "parent should be back to pending"
1516        );
1517        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1518            parent_log.responses.last(),
1519            Some(JoinSetResponseEventOuter{
1520                created_at: at,
1521                event: JoinSetResponseEvent{
1522                    join_set_id: found_join_set_id,
1523                    event: JoinSetResponse::ChildExecutionFinished {
1524                        child_execution_id: found_child_execution_id,
1525                        finished_version,
1526                        result: found_result,
1527                    }
1528                }
1529            })
1530             if *at == sim_clock.now()
1531            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1532        );
1533        assert_eq!(join_set_id, *found_join_set_id);
1534        assert_eq!(child_execution_id, *found_child_execution_id);
1535        assert_eq!(child_log.next_version, *child_finished_version);
1536        assert_matches!(
1537            found_result,
1538            SupportedFunctionReturnValue::ExecutionError(_)
1539        );
1540
1541        db_close.close().await;
1542    }
1543
1544    #[derive(Clone, Debug)]
1545    struct SleepyWorker {
1546        duration: Duration,
1547        result: SupportedFunctionReturnValue,
1548        exported: [FunctionMetadata; 1],
1549    }
1550
1551    #[async_trait]
1552    impl Worker for SleepyWorker {
1553        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1554            tokio::time::sleep(self.duration).await;
1555            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1556        }
1557
1558        fn exported_functions(&self) -> &[FunctionMetadata] {
1559            &self.exported
1560        }
1561    }
1562
1563    #[tokio::test]
1564    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1565        set_up();
1566        let sim_clock = SimClock::default();
1567        let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1568        let lock_expiry = Duration::from_millis(100);
1569        let timeout_duration = Duration::from_millis(300);
1570        let retry_config = ComponentRetryConfig {
1571            max_retries: Some(1),
1572            retry_exp_backoff: timeout_duration,
1573        };
1574        let exec_config = ExecConfig {
1575            batch_size: 1,
1576            lock_expiry,
1577            tick_sleep: Duration::ZERO,
1578            component_id: ComponentId::dummy_activity(),
1579            task_limiter: None,
1580            executor_id: ExecutorId::generate(),
1581            retry_config,
1582            locking_strategy: LockingStrategy::default(),
1583        };
1584
1585        let worker = Arc::new(SleepyWorker {
1586            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1587            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1588            exported: [FunctionMetadata {
1589                ffqn: FFQN_SOME,
1590                parameter_types: ParameterTypes::default(),
1591                return_type: RETURN_TYPE_DUMMY,
1592                extension: None,
1593                submittable: true,
1594            }],
1595        });
1596        // Create an execution
1597        let execution_id = ExecutionId::generate();
1598        let db_connection = db_pool.connection_test().await.unwrap();
1599        db_connection
1600            .create(CreateRequest {
1601                created_at: sim_clock.now(),
1602                execution_id: execution_id.clone(),
1603                ffqn: FFQN_SOME,
1604                params: Params::empty(),
1605                parent: None,
1606                metadata: concepts::ExecutionMetadata::empty(),
1607                scheduled_at: sim_clock.now(),
1608                component_id: ComponentId::dummy_activity(),
1609                scheduled_by: None,
1610            })
1611            .await
1612            .unwrap();
1613
1614        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1615        let executor = ExecTask::new_test(
1616            worker,
1617            exec_config.clone(),
1618            sim_clock.clone(),
1619            db_pool.clone(),
1620            ffqns,
1621        );
1622        let db_exec = db_pool.db_exec_conn().await.unwrap();
1623        let mut first_execution_progress = executor
1624            .tick(db_exec.as_ref(), sim_clock.now(), RunId::generate())
1625            .await
1626            .unwrap();
1627        assert_eq!(1, first_execution_progress.executions.len());
1628        // Started hanging, wait for lock expiry.
1629        sim_clock.move_time_forward(lock_expiry);
1630        // cleanup should be called
1631        let now_after_first_lock_expiry = sim_clock.now();
1632        {
1633            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1634            let cleanup_progress = executor
1635                .tick(
1636                    db_pool.db_exec_conn().await.unwrap().as_ref(),
1637                    now_after_first_lock_expiry,
1638                    RunId::generate(),
1639                )
1640                .await
1641                .unwrap();
1642            assert!(cleanup_progress.executions.is_empty());
1643        }
1644        {
1645            let expired_locks = expired_timers_watcher::tick(
1646                db_pool.connection().await.unwrap().as_ref(),
1647                now_after_first_lock_expiry,
1648            )
1649            .await
1650            .unwrap()
1651            .expired_locks;
1652            assert_eq!(1, expired_locks);
1653        }
1654        assert!(
1655            !first_execution_progress
1656                .executions
1657                .pop()
1658                .unwrap()
1659                .1
1660                .is_finished()
1661        );
1662
1663        let execution_log = db_connection.get(&execution_id).await.unwrap();
1664        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1665        assert_matches!(
1666            &execution_log.events.get(2).unwrap(),
1667            ExecutionEvent {
1668                event: ExecutionRequest::TemporarilyTimedOut { backoff_expires_at, .. },
1669                created_at: at,
1670                backtrace_id: None,
1671                version: Version(2),
1672            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1673        );
1674        assert_matches!(
1675            execution_log.pending_state,
1676            PendingState::PendingAt {
1677                scheduled_at: found_scheduled_by,
1678                last_lock: Some(LockedBy {
1679                    executor_id: found_executor_id,
1680                    run_id: _,
1681                }),
1682                component_id_input_digest: _
1683            } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1684        );
1685        sim_clock.move_time_forward(timeout_duration);
1686        let now_after_first_timeout = sim_clock.now();
1687        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1688
1689        let mut second_execution_progress = executor
1690            .tick(
1691                db_pool.db_exec_conn().await.unwrap().as_ref(),
1692                now_after_first_timeout,
1693                RunId::generate(),
1694            )
1695            .await
1696            .unwrap();
1697        assert_eq!(1, second_execution_progress.executions.len());
1698
1699        // Started hanging, wait for lock expiry.
1700        sim_clock.move_time_forward(lock_expiry);
1701        // cleanup should be called
1702        let now_after_second_lock_expiry = sim_clock.now();
1703        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1704        {
1705            let cleanup_progress = executor
1706                .tick(
1707                    db_pool.db_exec_conn().await.unwrap().as_ref(),
1708                    now_after_second_lock_expiry,
1709                    RunId::generate(),
1710                )
1711                .await
1712                .unwrap();
1713            assert!(cleanup_progress.executions.is_empty());
1714        }
1715        {
1716            let expired_locks = expired_timers_watcher::tick(
1717                db_pool.connection().await.unwrap().as_ref(),
1718                now_after_second_lock_expiry,
1719            )
1720            .await
1721            .unwrap()
1722            .expired_locks;
1723            assert_eq!(1, expired_locks);
1724        }
1725        assert!(
1726            !second_execution_progress
1727                .executions
1728                .pop()
1729                .unwrap()
1730                .1
1731                .is_finished()
1732        );
1733
1734        drop(db_connection);
1735        drop(executor);
1736        db_close.close().await;
1737    }
1738}