obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
8    LockedExecution,
9};
10use concepts::time::{ClockFn, Sleep};
11use concepts::{
12    ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
13};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16    FinishedExecutionError,
17    storage::{ExecutionEventInner, JoinSetResponse, Version},
18};
19use concepts::{JoinSetId, PermanentFailureKind};
20use std::fmt::Display;
21use std::{
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26    time::Duration,
27};
28use tokio::task::{AbortHandle, JoinHandle};
29use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
30
31#[derive(Debug, Clone)]
32pub struct ExecConfig {
33    pub lock_expiry: Duration,
34    pub tick_sleep: Duration,
35    pub batch_size: u32,
36    pub component_id: ComponentId,
37    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
38    pub executor_id: ExecutorId,
39    pub retry_config: ComponentRetryConfig,
40}
41
42pub struct ExecTask<C: ClockFn> {
43    worker: Arc<dyn Worker>,
44    config: ExecConfig,
45    clock_fn: C, // Used for obtaining current time when the execution finishes.
46    db_exec: Arc<dyn DbExecutor>,
47    ffqns: Arc<[FunctionFqn]>,
48}
49
50#[derive(derive_more::Debug, Default)]
51pub struct ExecutionProgress {
52    #[debug(skip)]
53    #[allow(dead_code)]
54    executions: Vec<(ExecutionId, JoinHandle<()>)>,
55}
56
57impl ExecutionProgress {
58    #[cfg(feature = "test")]
59    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
60        let mut vec = Vec::new();
61        for (exe, join_handle) in self.executions {
62            vec.push(exe);
63            join_handle.await.unwrap();
64        }
65        vec
66    }
67}
68
69#[derive(derive_more::Debug)]
70pub struct ExecutorTaskHandle {
71    #[debug(skip)]
72    is_closing: Arc<AtomicBool>,
73    #[debug(skip)]
74    abort_handle: AbortHandle,
75    component_id: ComponentId,
76    executor_id: ExecutorId,
77}
78
79impl ExecutorTaskHandle {
80    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
81    pub async fn close(&self) {
82        trace!("Gracefully closing");
83        // TODO: Close all the workers tasks as well.
84        // Simple solution:
85        // Attempt to unlock all in-progress executions. Activities will not be penalized for shutdown, although the number of retries would increase,
86        // some progress will be lost.
87        // More complex solution:
88        // Activities will be awaited, up to some deadline, then unlocked.
89        // Workflows that are awating should be signaled using `DeadlineTracker`.
90        self.is_closing.store(true, Ordering::Relaxed);
91        while !self.abort_handle.is_finished() {
92            tokio::time::sleep(Duration::from_millis(1)).await;
93        }
94        debug!("Gracefully closed");
95    }
96}
97
98impl Drop for ExecutorTaskHandle {
99    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
100    fn drop(&mut self) {
101        if self.abort_handle.is_finished() {
102            return;
103        }
104        warn!("Aborting the executor task");
105        self.abort_handle.abort();
106    }
107}
108
109#[cfg(feature = "test")]
110pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
111    extract_exported_ffqns_noext(worker)
112}
113
114fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
115    worker
116        .exported_functions()
117        .iter()
118        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
119        .collect::<Arc<_>>()
120}
121
122impl<C: ClockFn + 'static> ExecTask<C> {
123    #[cfg(feature = "test")]
124    pub fn new_test(
125        worker: Arc<dyn Worker>,
126        config: ExecConfig,
127        clock_fn: C,
128        db_exec: Arc<dyn DbExecutor>,
129        ffqns: Arc<[FunctionFqn]>,
130    ) -> Self {
131        Self {
132            worker,
133            config,
134            clock_fn,
135            db_exec,
136            ffqns,
137        }
138    }
139
140    #[cfg(feature = "test")]
141    pub fn new_all_ffqns_test(
142        worker: Arc<dyn Worker>,
143        config: ExecConfig,
144        clock_fn: C,
145        db_exec: Arc<dyn DbExecutor>,
146    ) -> Self {
147        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
148        Self {
149            worker,
150            config,
151            clock_fn,
152            db_exec,
153            ffqns,
154        }
155    }
156
157    pub fn spawn_new(
158        worker: Arc<dyn Worker>,
159        config: ExecConfig,
160        clock_fn: C,
161        db_exec: Arc<dyn DbExecutor>,
162        sleep: impl Sleep + 'static,
163    ) -> ExecutorTaskHandle {
164        let is_closing = Arc::new(AtomicBool::default());
165        let is_closing_inner = is_closing.clone();
166        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167        let component_id = config.component_id.clone();
168        let executor_id = config.executor_id;
169        let abort_handle = tokio::spawn(async move {
170            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
171            let task = ExecTask {
172                worker,
173                config,
174                db_exec,
175                ffqns: ffqns.clone(),
176                clock_fn: clock_fn.clone(),
177            };
178            while !is_closing_inner.load(Ordering::Relaxed) {
179                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
180
181                task.db_exec
182                    .wait_for_pending(clock_fn.now(), ffqns.clone(), {
183                        let sleep = sleep.clone();
184                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
185                    .await;
186            }
187        })
188        .abort_handle();
189        ExecutorTaskHandle {
190            is_closing,
191            abort_handle,
192            component_id,
193            executor_id,
194        }
195    }
196
197    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
198        if let Some(task_limiter) = &self.config.task_limiter {
199            let mut locks = Vec::new();
200            for _ in 0..self.config.batch_size {
201                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
202                    locks.push(Some(permit));
203                } else {
204                    break;
205                }
206            }
207            locks
208        } else {
209            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
210            for _ in 0..self.config.batch_size {
211                vec.push(None);
212            }
213            vec
214        }
215    }
216
217    #[cfg(feature = "test")]
218    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
219        self.tick(executed_at, run_id).await.unwrap()
220    }
221
222    #[cfg(feature = "test")]
223    pub async fn tick_test_await(
224        &self,
225        executed_at: DateTime<Utc>,
226        run_id: RunId,
227    ) -> Vec<ExecutionId> {
228        self.tick(executed_at, run_id)
229            .await
230            .unwrap()
231            .wait_for_tasks()
232            .await
233    }
234
235    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
236    async fn tick(
237        &self,
238        executed_at: DateTime<Utc>,
239        run_id: RunId,
240    ) -> Result<ExecutionProgress, DbErrorGeneric> {
241        let locked_executions = {
242            let mut permits = self.acquire_task_permits();
243            if permits.is_empty() {
244                return Ok(ExecutionProgress::default());
245            }
246            let lock_expires_at = executed_at + self.config.lock_expiry;
247            let locked_executions = self
248                .db_exec
249                .lock_pending(
250                    permits.len(), // batch size
251                    executed_at,   // fetch expiring before now
252                    self.ffqns.clone(),
253                    executed_at, // created at
254                    self.config.component_id.clone(),
255                    self.config.executor_id,
256                    lock_expires_at,
257                    run_id,
258                    self.config.retry_config,
259                )
260                .await?;
261            // Drop permits if too many were allocated.
262            while permits.len() > locked_executions.len() {
263                permits.pop();
264            }
265            assert_eq!(permits.len(), locked_executions.len());
266            locked_executions.into_iter().zip(permits)
267        };
268
269        let mut executions = Vec::with_capacity(locked_executions.len());
270        for (locked_execution, permit) in locked_executions {
271            let execution_id = locked_execution.execution_id.clone();
272            let join_handle = {
273                let worker = self.worker.clone();
274                let db = self.db_exec.clone();
275                let clock_fn = self.clock_fn.clone();
276                let worker_span = info_span!(parent: None, "worker",
277                    "otel.name" = format!("worker {}", locked_execution.ffqn),
278                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
279                locked_execution.metadata.enrich(&worker_span);
280                tokio::spawn({
281                    let worker_span2 = worker_span.clone();
282                    let retry_config = self.config.retry_config;
283                    async move {
284                        let _permit = permit;
285                        let res = Self::run_worker(
286                            worker,
287                            db.as_ref(),
288                            clock_fn,
289                            locked_execution,
290                            retry_config,
291                            worker_span2,
292                        )
293                        .await;
294                        if let Err(db_error) = res {
295                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
296                        }
297                    }
298                    .instrument(worker_span)
299                })
300            };
301            executions.push((execution_id, join_handle));
302        }
303        Ok(ExecutionProgress { executions })
304    }
305
306    async fn run_worker(
307        worker: Arc<dyn Worker>,
308        db_exec: &dyn DbExecutor,
309        clock_fn: C,
310        locked_execution: LockedExecution,
311        retry_config: ComponentRetryConfig,
312        worker_span: Span,
313    ) -> Result<(), DbErrorWrite> {
314        debug!("Worker::run starting");
315        trace!(
316            version = %locked_execution.next_version,
317            params = ?locked_execution.params,
318            event_history = ?locked_execution.event_history,
319            "Worker::run starting"
320        );
321        let can_be_retried = ExecutionLog::can_be_retried_after(
322            locked_execution.intermittent_event_count + 1,
323            retry_config.max_retries,
324            retry_config.retry_exp_backoff,
325        );
326        let unlock_expiry_on_limit_reached =
327            ExecutionLog::compute_retry_duration_when_retrying_forever(
328                locked_execution.intermittent_event_count + 1,
329                retry_config.retry_exp_backoff,
330            );
331        let ctx = WorkerContext {
332            execution_id: locked_execution.execution_id.clone(),
333            metadata: locked_execution.metadata,
334            ffqn: locked_execution.ffqn,
335            params: locked_execution.params,
336            event_history: locked_execution.event_history,
337            responses: locked_execution
338                .responses
339                .into_iter()
340                .map(|outer| outer.event)
341                .collect(),
342            version: locked_execution.next_version,
343            can_be_retried: can_be_retried.is_some(),
344            locked_event: locked_execution.locked_event,
345            worker_span,
346        };
347        let worker_result = worker.run(ctx).await;
348        trace!(?worker_result, "Worker::run finished");
349        let result_obtained_at = clock_fn.now();
350        match Self::worker_result_to_execution_event(
351            locked_execution.execution_id,
352            worker_result,
353            result_obtained_at,
354            locked_execution.parent,
355            can_be_retried,
356            unlock_expiry_on_limit_reached,
357        )? {
358            Some(append) => {
359                trace!("Appending {append:?}");
360                append.append(db_exec).await
361            }
362            None => Ok(()),
363        }
364    }
365
366    /// Map the `WorkerError` to an temporary or a permanent failure.
367    fn worker_result_to_execution_event(
368        execution_id: ExecutionId,
369        worker_result: WorkerResult,
370        result_obtained_at: DateTime<Utc>,
371        parent: Option<(ExecutionId, JoinSetId)>,
372        can_be_retried: Option<Duration>,
373        unlock_expiry_on_limit_reached: Duration,
374    ) -> Result<Option<Append>, DbErrorWrite> {
375        Ok(match worker_result {
376            WorkerResult::Ok(result, new_version, http_client_traces) => {
377                info!(
378                    "Execution finished: {}",
379                    result.as_pending_state_finished_result()
380                );
381                let child_finished =
382                    parent.map(
383                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
384                            parent_execution_id,
385                            parent_join_set,
386                            result: result.clone(),
387                        },
388                    );
389                let primary_event = AppendRequest {
390                    created_at: result_obtained_at,
391                    event: ExecutionEventInner::Finished {
392                        result,
393                        http_client_traces,
394                    },
395                };
396
397                Some(Append {
398                    created_at: result_obtained_at,
399                    primary_event,
400                    execution_id,
401                    version: new_version,
402                    child_finished,
403                })
404            }
405            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
406            WorkerResult::Err(err) => {
407                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
408                let (primary_event, child_finished, version) = match err {
409                    WorkerError::TemporaryTimeout {
410                        http_client_traces,
411                        version,
412                    } => {
413                        if let Some(duration) = can_be_retried {
414                            let backoff_expires_at = result_obtained_at + duration;
415                            info!(
416                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
417                            );
418                            (
419                                ExecutionEventInner::TemporarilyTimedOut {
420                                    backoff_expires_at,
421                                    http_client_traces,
422                                },
423                                None,
424                                version,
425                            )
426                        } else {
427                            info!("Permanent timeout");
428                            let result = SupportedFunctionReturnValue::ExecutionError(
429                                FinishedExecutionError::PermanentTimeout,
430                            );
431                            let child_finished =
432                                parent.map(|(parent_execution_id, parent_join_set)| {
433                                    ChildFinishedResponse {
434                                        parent_execution_id,
435                                        parent_join_set,
436                                        result: result.clone(),
437                                    }
438                                });
439                            (
440                                ExecutionEventInner::Finished {
441                                    result,
442                                    http_client_traces,
443                                },
444                                child_finished,
445                                version,
446                            )
447                        }
448                    }
449                    WorkerError::DbError(db_error) => {
450                        return Err(db_error);
451                    }
452                    WorkerError::ActivityTrap {
453                        reason: reason_inner,
454                        trap_kind,
455                        detail,
456                        version,
457                        http_client_traces,
458                    } => retry_or_fail(
459                        result_obtained_at,
460                        parent,
461                        can_be_retried,
462                        reason_full,
463                        reason_inner,
464                        trap_kind, // short reason like `trap` for logs
465                        detail,
466                        version,
467                        http_client_traces,
468                    ),
469                    WorkerError::ActivityPreopenedDirError {
470                        reason_kind,
471                        reason_inner,
472                        version,
473                    } => retry_or_fail(
474                        result_obtained_at,
475                        parent,
476                        can_be_retried,
477                        reason_full,
478                        reason_inner,
479                        reason_kind, // short reason for logs
480                        None,        // detail
481                        version,
482                        None, // http_client_traces
483                    ),
484                    WorkerError::ActivityReturnedError {
485                        detail,
486                        version,
487                        http_client_traces,
488                    } => {
489                        let duration = can_be_retried.expect(
490                            "ActivityReturnedError must not be returned when retries are exhausted",
491                        );
492                        let expires_at = result_obtained_at + duration;
493                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
494                        (
495                            ExecutionEventInner::TemporarilyFailed {
496                                backoff_expires_at: expires_at,
497                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
498                                reason_inner: StrVariant::Static("activity returned error"),
499                                detail, // contains the backtrace
500                                http_client_traces,
501                            },
502                            None,
503                            version,
504                        )
505                    }
506                    WorkerError::LimitReached {
507                        reason: inner_reason,
508                        version: new_version,
509                    } => {
510                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
511                        warn!(
512                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
513                        );
514                        (
515                            ExecutionEventInner::Unlocked {
516                                backoff_expires_at: expires_at,
517                                reason: StrVariant::from(reason_full),
518                            },
519                            None,
520                            new_version,
521                        )
522                    }
523                    WorkerError::FatalError(fatal_error, version) => {
524                        warn!("Fatal worker error - {fatal_error:?}");
525                        let result = SupportedFunctionReturnValue::ExecutionError(
526                            FinishedExecutionError::from(fatal_error),
527                        );
528                        let child_finished =
529                            parent.map(|(parent_execution_id, parent_join_set)| {
530                                ChildFinishedResponse {
531                                    parent_execution_id,
532                                    parent_join_set,
533                                    result: result.clone(),
534                                }
535                            });
536                        (
537                            ExecutionEventInner::Finished {
538                                result,
539                                http_client_traces: None,
540                            },
541                            child_finished,
542                            version,
543                        )
544                    }
545                };
546                Some(Append {
547                    created_at: result_obtained_at,
548                    primary_event: AppendRequest {
549                        created_at: result_obtained_at,
550                        event: primary_event,
551                    },
552                    execution_id,
553                    version,
554                    child_finished,
555                })
556            }
557        })
558    }
559}
560
561#[expect(clippy::too_many_arguments)]
562fn retry_or_fail(
563    result_obtained_at: DateTime<Utc>,
564    parent: Option<(ExecutionId, JoinSetId)>,
565    can_be_retried: Option<Duration>,
566    reason_full: String,
567    reason_inner: String,
568    err_kind_for_logs: impl Display,
569    detail: Option<String>,
570    version: Version,
571    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
572) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
573    if let Some(duration) = can_be_retried {
574        let expires_at = result_obtained_at + duration;
575        debug!(
576            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
577        );
578        (
579            ExecutionEventInner::TemporarilyFailed {
580                backoff_expires_at: expires_at,
581                reason_full: StrVariant::from(reason_full),
582                reason_inner: StrVariant::from(reason_inner),
583                detail,
584                http_client_traces,
585            },
586            None,
587            version,
588        )
589    } else {
590        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
591        let result = SupportedFunctionReturnValue::ExecutionError(
592            FinishedExecutionError::PermanentFailure {
593                reason_inner,
594                reason_full,
595                kind: PermanentFailureKind::ActivityTrap,
596                detail,
597            },
598        );
599
600        let child_finished =
601            parent.map(
602                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
603                    parent_execution_id,
604                    parent_join_set,
605                    result: result.clone(),
606                },
607            );
608        (
609            ExecutionEventInner::Finished {
610                result,
611                http_client_traces,
612            },
613            child_finished,
614            version,
615        )
616    }
617}
618
619#[derive(Debug, Clone)]
620pub(crate) struct ChildFinishedResponse {
621    pub(crate) parent_execution_id: ExecutionId,
622    pub(crate) parent_join_set: JoinSetId,
623    pub(crate) result: SupportedFunctionReturnValue,
624}
625
626#[derive(Debug, Clone)]
627pub(crate) struct Append {
628    pub(crate) created_at: DateTime<Utc>,
629    pub(crate) primary_event: AppendRequest,
630    pub(crate) execution_id: ExecutionId,
631    pub(crate) version: Version,
632    pub(crate) child_finished: Option<ChildFinishedResponse>,
633}
634
635impl Append {
636    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
637        if let Some(child_finished) = self.child_finished {
638            assert_matches!(
639                &self.primary_event,
640                AppendRequest {
641                    event: ExecutionEventInner::Finished { .. },
642                    ..
643                }
644            );
645            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
646            let events = AppendEventsToExecution {
647                execution_id: self.execution_id,
648                version: self.version.clone(),
649                batch: vec![self.primary_event],
650            };
651            let response = AppendResponseToExecution {
652                parent_execution_id: child_finished.parent_execution_id,
653                parent_response_event: JoinSetResponseEventOuter {
654                    created_at: self.created_at,
655                    event: JoinSetResponseEvent {
656                        join_set_id: child_finished.parent_join_set,
657                        event: JoinSetResponse::ChildExecutionFinished {
658                            child_execution_id: derived,
659                            // Since self.primary_event is a finished event, the version will remain the same.
660                            finished_version: self.version,
661                            result: child_finished.result,
662                        },
663                    },
664                },
665            };
666
667            db_exec
668                .append_batch_respond_to_parent(events, response, self.created_at)
669                .await?;
670        } else {
671            db_exec
672                .append(self.execution_id, self.version, self.primary_event)
673                .await?;
674        }
675        Ok(())
676    }
677}
678
679#[cfg(any(test, feature = "test"))]
680pub mod simple_worker {
681    use crate::worker::{Worker, WorkerContext, WorkerResult};
682    use async_trait::async_trait;
683    use concepts::{
684        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
685        storage::{HistoryEvent, Version},
686    };
687    use indexmap::IndexMap;
688    use std::sync::Arc;
689    use tracing::trace;
690
691    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
692    pub type SimpleWorkerResultMap =
693        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
694
695    #[derive(Clone, Debug)]
696    pub struct SimpleWorker {
697        pub worker_results_rev: SimpleWorkerResultMap,
698        pub ffqn: FunctionFqn,
699        exported: [FunctionMetadata; 1],
700    }
701
702    impl SimpleWorker {
703        #[must_use]
704        pub fn with_single_result(res: WorkerResult) -> Self {
705            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
706                Version::new(2),
707                (vec![], res),
708            )]))))
709        }
710
711        #[must_use]
712        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
713            Self {
714                worker_results_rev: self.worker_results_rev,
715                exported: [FunctionMetadata {
716                    ffqn: ffqn.clone(),
717                    parameter_types: ParameterTypes::default(),
718                    return_type: RETURN_TYPE_DUMMY,
719                    extension: None,
720                    submittable: true,
721                }],
722                ffqn,
723            }
724        }
725
726        #[must_use]
727        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
728            Self {
729                worker_results_rev,
730                ffqn: FFQN_SOME,
731                exported: [FunctionMetadata {
732                    ffqn: FFQN_SOME,
733                    parameter_types: ParameterTypes::default(),
734                    return_type: RETURN_TYPE_DUMMY,
735                    extension: None,
736                    submittable: true,
737                }],
738            }
739        }
740    }
741
742    #[async_trait]
743    impl Worker for SimpleWorker {
744        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
745            let (expected_version, (expected_eh, worker_result)) =
746                self.worker_results_rev.lock().unwrap().pop().unwrap();
747            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
748            assert_eq!(expected_version, ctx.version);
749            assert_eq!(expected_eh, ctx.event_history);
750            worker_result
751        }
752
753        fn exported_functions(&self) -> &[FunctionMetadata] {
754            &self.exported
755        }
756    }
757}
758
759#[cfg(test)]
760mod tests {
761    use self::simple_worker::SimpleWorker;
762    use super::*;
763    use crate::{expired_timers_watcher, worker::WorkerResult};
764    use assert_matches::assert_matches;
765    use async_trait::async_trait;
766    use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
767    use concepts::storage::{DbPoolCloseable, LockedBy};
768    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
769    use concepts::time::Now;
770    use concepts::{
771        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
772        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
773    };
774    use db_tests::Database;
775    use indexmap::IndexMap;
776    use simple_worker::FFQN_SOME;
777    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
778    use test_utils::set_up;
779    use test_utils::sim_clock::{ConstClock, SimClock};
780
781    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
782
783    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
784        config: ExecConfig,
785        clock_fn: C,
786        db_exec: Arc<dyn DbExecutor>,
787        worker: Arc<W>,
788        executed_at: DateTime<Utc>,
789    ) -> Vec<ExecutionId> {
790        trace!("Ticking with {worker:?}");
791        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
792        let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
793        executor
794            .tick_test_await(executed_at, RunId::generate())
795            .await
796    }
797
798    #[tokio::test]
799    async fn execute_simple_lifecycle_tick_based_mem() {
800        let created_at = Now.now();
801        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
802        execute_simple_lifecycle_tick_based(
803            db_pool.connection().as_ref(),
804            db_exec.clone(),
805            ConstClock(created_at),
806        )
807        .await;
808        db_close.close().await;
809    }
810
811    #[tokio::test]
812    async fn execute_simple_lifecycle_tick_based_sqlite() {
813        let created_at = Now.now();
814        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
815        execute_simple_lifecycle_tick_based(
816            db_pool.connection().as_ref(),
817            db_exec.clone(),
818            ConstClock(created_at),
819        )
820        .await;
821        db_close.close().await;
822    }
823
824    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
825        db_connection: &dyn DbConnection,
826        db_exec: Arc<dyn DbExecutor>,
827        clock_fn: C,
828    ) {
829        set_up();
830        let created_at = clock_fn.now();
831        let exec_config = ExecConfig {
832            batch_size: 1,
833            lock_expiry: Duration::from_secs(1),
834            tick_sleep: Duration::from_millis(100),
835            component_id: ComponentId::dummy_activity(),
836            task_limiter: None,
837            executor_id: ExecutorId::generate(),
838            retry_config: ComponentRetryConfig::ZERO,
839        };
840
841        let execution_log = create_and_tick(
842            CreateAndTickConfig {
843                execution_id: ExecutionId::generate(),
844                created_at,
845                executed_at: created_at,
846            },
847            clock_fn,
848            db_connection,
849            db_exec,
850            exec_config,
851            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
852                SUPPORTED_RETURN_VALUE_OK_EMPTY,
853                Version::new(2),
854                None,
855            ))),
856            tick_fn,
857        )
858        .await;
859        assert_matches!(
860            execution_log.events.get(2).unwrap(),
861            ExecutionEvent {
862                event: ExecutionEventInner::Finished {
863                    result: SupportedFunctionReturnValue::Ok { ok: None },
864                    http_client_traces: None
865                },
866                created_at: _,
867                backtrace_id: None,
868            }
869        );
870    }
871
872    #[tokio::test]
873    async fn execute_simple_lifecycle_task_based_mem() {
874        set_up();
875        let created_at = Now.now();
876        let clock_fn = ConstClock(created_at);
877        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
878        let exec_config = ExecConfig {
879            batch_size: 1,
880            lock_expiry: Duration::from_secs(1),
881            tick_sleep: Duration::ZERO,
882            component_id: ComponentId::dummy_activity(),
883            task_limiter: None,
884            executor_id: ExecutorId::generate(),
885            retry_config: ComponentRetryConfig::ZERO,
886        };
887
888        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
889            SUPPORTED_RETURN_VALUE_OK_EMPTY,
890            Version::new(2),
891            None,
892        )));
893
894        let execution_log = create_and_tick(
895            CreateAndTickConfig {
896                execution_id: ExecutionId::generate(),
897                created_at,
898                executed_at: created_at,
899            },
900            clock_fn,
901            db_pool.connection().as_ref(),
902            db_exec,
903            exec_config,
904            worker,
905            tick_fn,
906        )
907        .await;
908        assert_matches!(
909            execution_log.events.get(2).unwrap(),
910            ExecutionEvent {
911                event: ExecutionEventInner::Finished {
912                    result: SupportedFunctionReturnValue::Ok { ok: None },
913                    http_client_traces: None
914                },
915                created_at: _,
916                backtrace_id: None,
917            }
918        );
919        db_close.close().await;
920    }
921
922    struct CreateAndTickConfig {
923        execution_id: ExecutionId,
924        created_at: DateTime<Utc>,
925        executed_at: DateTime<Utc>,
926    }
927
928    async fn create_and_tick<
929        W: Worker,
930        C: ClockFn,
931        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
932        F: Future<Output = Vec<ExecutionId>>,
933    >(
934        config: CreateAndTickConfig,
935        clock_fn: C,
936        db_connection: &dyn DbConnection,
937        db_exec: Arc<dyn DbExecutor>,
938        exec_config: ExecConfig,
939        worker: Arc<W>,
940        mut tick: T,
941    ) -> ExecutionLog {
942        // Create an execution
943        db_connection
944            .create(CreateRequest {
945                created_at: config.created_at,
946                execution_id: config.execution_id.clone(),
947                ffqn: FFQN_SOME,
948                params: Params::empty(),
949                parent: None,
950                metadata: concepts::ExecutionMetadata::empty(),
951                scheduled_at: config.created_at,
952                component_id: ComponentId::dummy_activity(),
953                scheduled_by: None,
954            })
955            .await
956            .unwrap();
957        // execute!
958        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
959        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
960        debug!("Execution history after tick: {execution_log:?}");
961        // check that DB contains Created and Locked events.
962        assert_matches!(
963            execution_log.events.first().unwrap(),
964            ExecutionEvent {
965                event: ExecutionEventInner::Created { .. },
966                created_at: actually_created_at,
967                backtrace_id: None,
968            }
969            if config.created_at == *actually_created_at
970        );
971        let locked_at = assert_matches!(
972            execution_log.events.get(1).unwrap(),
973            ExecutionEvent {
974                event: ExecutionEventInner::Locked { .. },
975                created_at: locked_at,
976                backtrace_id: None,
977            } if config.created_at <= *locked_at
978            => *locked_at
979        );
980        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
981            event: _,
982            created_at: executed_at,
983            backtrace_id: None,
984        } if *executed_at >= locked_at);
985        execution_log
986    }
987
988    #[tokio::test]
989    async fn activity_trap_should_trigger_an_execution_retry() {
990        set_up();
991        let sim_clock = SimClock::default();
992        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
993        let retry_exp_backoff = Duration::from_millis(100);
994        let retry_config = ComponentRetryConfig {
995            max_retries: 1,
996            retry_exp_backoff,
997        };
998        let exec_config = ExecConfig {
999            batch_size: 1,
1000            lock_expiry: Duration::from_secs(1),
1001            tick_sleep: Duration::ZERO,
1002            component_id: ComponentId::dummy_activity(),
1003            task_limiter: None,
1004            executor_id: ExecutorId::generate(),
1005            retry_config,
1006        };
1007        let expected_reason = "error reason";
1008        let expected_detail = "error detail";
1009        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1010            WorkerError::ActivityTrap {
1011                reason: expected_reason.to_string(),
1012                trap_kind: concepts::TrapKind::Trap,
1013                detail: Some(expected_detail.to_string()),
1014                version: Version::new(2),
1015                http_client_traces: None,
1016            },
1017        )));
1018        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1019        let execution_log = create_and_tick(
1020            CreateAndTickConfig {
1021                execution_id: ExecutionId::generate(),
1022                created_at: sim_clock.now(),
1023                executed_at: sim_clock.now(),
1024            },
1025            sim_clock.clone(),
1026            db_pool.connection().as_ref(),
1027            db_exec.clone(),
1028            exec_config.clone(),
1029            worker,
1030            tick_fn,
1031        )
1032        .await;
1033        assert_eq!(3, execution_log.events.len());
1034        {
1035            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1036                &execution_log.events.get(2).unwrap(),
1037                ExecutionEvent {
1038                    event: ExecutionEventInner::TemporarilyFailed {
1039                        reason_inner,
1040                        reason_full,
1041                        detail,
1042                        backoff_expires_at,
1043                        http_client_traces: None,
1044                    },
1045                    created_at: at,
1046                    backtrace_id: None,
1047                }
1048                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1049            );
1050            assert_eq!(expected_reason, reason_inner.deref());
1051            assert_eq!(
1052                format!("activity trap: {expected_reason}"),
1053                reason_full.deref()
1054            );
1055            assert_eq!(Some(expected_detail), detail.as_deref());
1056            assert_eq!(at, sim_clock.now());
1057            assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1058        }
1059        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1060            std::sync::Mutex::new(IndexMap::from([(
1061                Version::new(4),
1062                (
1063                    vec![],
1064                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1065                ),
1066            )])),
1067        )));
1068        // noop until `retry_exp_backoff` expires
1069        assert!(
1070            tick_fn(
1071                exec_config.clone(),
1072                sim_clock.clone(),
1073                db_exec.clone(),
1074                worker.clone(),
1075                sim_clock.now(),
1076            )
1077            .await
1078            .is_empty()
1079        );
1080        // tick again to finish the execution
1081        sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1082        tick_fn(
1083            exec_config,
1084            sim_clock.clone(),
1085            db_exec.clone(),
1086            worker,
1087            sim_clock.now(),
1088        )
1089        .await;
1090        let execution_log = {
1091            let db_connection = db_pool.connection();
1092            db_connection
1093                .get(&execution_log.execution_id)
1094                .await
1095                .unwrap()
1096        };
1097        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1098        assert_matches!(
1099            execution_log.events.get(3).unwrap(),
1100            ExecutionEvent {
1101                event: ExecutionEventInner::Locked { .. },
1102                created_at: at,
1103                backtrace_id: None,
1104            } if *at == sim_clock.now()
1105        );
1106        assert_matches!(
1107            execution_log.events.get(4).unwrap(),
1108            ExecutionEvent {
1109                event: ExecutionEventInner::Finished {
1110                    result: SupportedFunctionReturnValue::Ok{ok:None},
1111                    http_client_traces: None
1112                },
1113                created_at: finished_at,
1114                backtrace_id: None,
1115            } if *finished_at == sim_clock.now()
1116        );
1117        db_close.close().await;
1118    }
1119
1120    #[tokio::test]
1121    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1122        set_up();
1123        let created_at = Now.now();
1124        let clock_fn = ConstClock(created_at);
1125        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1126        let exec_config = ExecConfig {
1127            batch_size: 1,
1128            lock_expiry: Duration::from_secs(1),
1129            tick_sleep: Duration::ZERO,
1130            component_id: ComponentId::dummy_activity(),
1131            task_limiter: None,
1132            executor_id: ExecutorId::generate(),
1133            retry_config: ComponentRetryConfig::ZERO,
1134        };
1135
1136        let expected_reason = "error reason";
1137        let expected_detail = "error detail";
1138        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1139            WorkerError::ActivityTrap {
1140                reason: expected_reason.to_string(),
1141                trap_kind: concepts::TrapKind::Trap,
1142                detail: Some(expected_detail.to_string()),
1143                version: Version::new(2),
1144                http_client_traces: None,
1145            },
1146        )));
1147        let execution_log = create_and_tick(
1148            CreateAndTickConfig {
1149                execution_id: ExecutionId::generate(),
1150                created_at,
1151                executed_at: created_at,
1152            },
1153            clock_fn,
1154            db_pool.connection().as_ref(),
1155            db_exec,
1156            exec_config.clone(),
1157            worker,
1158            tick_fn,
1159        )
1160        .await;
1161        assert_eq!(3, execution_log.events.len());
1162        let (reason, kind, detail) = assert_matches!(
1163            &execution_log.events.get(2).unwrap(),
1164            ExecutionEvent {
1165                event: ExecutionEventInner::Finished{
1166                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1167                    http_client_traces: None
1168                },
1169                created_at: at,
1170                backtrace_id: None,
1171            } if *at == created_at
1172            => (reason_inner, kind, detail)
1173        );
1174        assert_eq!(expected_reason, *reason);
1175        assert_eq!(Some(expected_detail), detail.as_deref());
1176        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1177
1178        db_close.close().await;
1179    }
1180
1181    #[tokio::test]
1182    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1183        let worker_error = WorkerError::ActivityTrap {
1184            reason: "error reason".to_string(),
1185            trap_kind: TrapKind::Trap,
1186            detail: Some("detail".to_string()),
1187            version: Version::new(2),
1188            http_client_traces: None,
1189        };
1190        let expected_child_err = FinishedExecutionError::PermanentFailure {
1191            reason_full: "activity trap: error reason".to_string(),
1192            reason_inner: "error reason".to_string(),
1193            kind: PermanentFailureKind::ActivityTrap,
1194            detail: Some("detail".to_string()),
1195        };
1196        child_execution_permanently_failed_should_notify_parent(
1197            WorkerResult::Err(worker_error),
1198            expected_child_err,
1199        )
1200        .await;
1201    }
1202
1203    // TODO: Add test for WorkerError::TemporaryTimeout
1204
1205    #[tokio::test]
1206    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1207        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1208        child_execution_permanently_failed_should_notify_parent(
1209            WorkerResult::DbUpdatedByWorkerOrWatcher,
1210            expected_child_err,
1211        )
1212        .await;
1213    }
1214
1215    async fn child_execution_permanently_failed_should_notify_parent(
1216        worker_result: WorkerResult,
1217        expected_child_err: FinishedExecutionError,
1218    ) {
1219        use concepts::storage::JoinSetResponseEventOuter;
1220        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1221
1222        set_up();
1223        let sim_clock = SimClock::default();
1224        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1225
1226        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1227            WorkerResult::DbUpdatedByWorkerOrWatcher,
1228        ));
1229        let parent_execution_id = ExecutionId::generate();
1230        db_pool
1231            .connection()
1232            .create(CreateRequest {
1233                created_at: sim_clock.now(),
1234                execution_id: parent_execution_id.clone(),
1235                ffqn: FFQN_SOME,
1236                params: Params::empty(),
1237                parent: None,
1238                metadata: concepts::ExecutionMetadata::empty(),
1239                scheduled_at: sim_clock.now(),
1240                component_id: ComponentId::dummy_activity(),
1241                scheduled_by: None,
1242            })
1243            .await
1244            .unwrap();
1245        let parent_executor_id = ExecutorId::generate();
1246        tick_fn(
1247            ExecConfig {
1248                batch_size: 1,
1249                lock_expiry: LOCK_EXPIRY,
1250                tick_sleep: Duration::ZERO,
1251                component_id: ComponentId::dummy_activity(),
1252                task_limiter: None,
1253                executor_id: parent_executor_id,
1254                retry_config: ComponentRetryConfig::ZERO,
1255            },
1256            sim_clock.clone(),
1257            db_exec.clone(),
1258            parent_worker,
1259            sim_clock.now(),
1260        )
1261        .await;
1262
1263        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1264        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1265        // executor does not append anything, this should have been written by the worker:
1266        {
1267            let params = Params::empty();
1268            let child = CreateRequest {
1269                created_at: sim_clock.now(),
1270                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1271                ffqn: FFQN_CHILD,
1272                params: params.clone(),
1273                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1274                metadata: concepts::ExecutionMetadata::empty(),
1275                scheduled_at: sim_clock.now(),
1276                component_id: ComponentId::dummy_activity(),
1277                scheduled_by: None,
1278            };
1279            let current_time = sim_clock.now();
1280            let join_set = AppendRequest {
1281                created_at: current_time,
1282                event: ExecutionEventInner::HistoryEvent {
1283                    event: HistoryEvent::JoinSetCreate {
1284                        join_set_id: join_set_id.clone(),
1285                        closing_strategy: ClosingStrategy::Complete,
1286                    },
1287                },
1288            };
1289            let child_exec_req = AppendRequest {
1290                created_at: current_time,
1291                event: ExecutionEventInner::HistoryEvent {
1292                    event: HistoryEvent::JoinSetRequest {
1293                        join_set_id: join_set_id.clone(),
1294                        request: JoinSetRequest::ChildExecutionRequest {
1295                            child_execution_id: child_execution_id.clone(),
1296                            target_ffqn: FFQN_CHILD,
1297                            params,
1298                        },
1299                    },
1300                },
1301            };
1302            let join_next = AppendRequest {
1303                created_at: current_time,
1304                event: ExecutionEventInner::HistoryEvent {
1305                    event: HistoryEvent::JoinNext {
1306                        join_set_id: join_set_id.clone(),
1307                        run_expires_at: sim_clock.now(),
1308                        closing: false,
1309                        requested_ffqn: Some(FFQN_CHILD),
1310                    },
1311                },
1312            };
1313            db_pool
1314                .connection()
1315                .append_batch_create_new_execution(
1316                    current_time,
1317                    vec![join_set, child_exec_req, join_next],
1318                    parent_execution_id.clone(),
1319                    Version::new(2),
1320                    vec![child],
1321                )
1322                .await
1323                .unwrap();
1324        }
1325
1326        let child_worker =
1327            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1328
1329        // execute the child
1330        tick_fn(
1331            ExecConfig {
1332                batch_size: 1,
1333                lock_expiry: LOCK_EXPIRY,
1334                tick_sleep: Duration::ZERO,
1335                component_id: ComponentId::dummy_activity(),
1336                task_limiter: None,
1337                executor_id: ExecutorId::generate(),
1338                retry_config: ComponentRetryConfig::ZERO,
1339            },
1340            sim_clock.clone(),
1341            db_exec.clone(),
1342            child_worker,
1343            sim_clock.now(),
1344        )
1345        .await;
1346        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1347            // In case of timeout, let the timers watcher handle it
1348            sim_clock.move_time_forward(LOCK_EXPIRY);
1349            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1350                .await
1351                .unwrap();
1352        }
1353        let child_log = db_pool
1354            .connection()
1355            .get(&ExecutionId::Derived(child_execution_id.clone()))
1356            .await
1357            .unwrap();
1358        assert!(child_log.pending_state.is_finished());
1359        assert_eq!(
1360            Version(2),
1361            child_log.next_version,
1362            "created = 0, locked = 1, with_single_result = 2"
1363        );
1364        assert_eq!(
1365            ExecutionEventInner::Finished {
1366                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1367                http_client_traces: None
1368            },
1369            child_log.last_event().event
1370        );
1371        let parent_log = db_pool
1372            .connection()
1373            .get(&parent_execution_id)
1374            .await
1375            .unwrap();
1376        assert_matches!(
1377            parent_log.pending_state,
1378            PendingState::PendingAt {
1379                scheduled_at,
1380                last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1381            } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1382            "parent should be back to pending"
1383        );
1384        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1385            parent_log.responses.last(),
1386            Some(JoinSetResponseEventOuter{
1387                created_at: at,
1388                event: JoinSetResponseEvent{
1389                    join_set_id: found_join_set_id,
1390                    event: JoinSetResponse::ChildExecutionFinished {
1391                        child_execution_id: found_child_execution_id,
1392                        finished_version,
1393                        result: found_result,
1394                    }
1395                }
1396            })
1397             if *at == sim_clock.now()
1398            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1399        );
1400        assert_eq!(join_set_id, *found_join_set_id);
1401        assert_eq!(child_execution_id, *found_child_execution_id);
1402        assert_eq!(child_log.next_version, *child_finished_version);
1403        assert_matches!(
1404            found_result,
1405            SupportedFunctionReturnValue::ExecutionError(_)
1406        );
1407
1408        db_close.close().await;
1409    }
1410
1411    #[derive(Clone, Debug)]
1412    struct SleepyWorker {
1413        duration: Duration,
1414        result: SupportedFunctionReturnValue,
1415        exported: [FunctionMetadata; 1],
1416    }
1417
1418    #[async_trait]
1419    impl Worker for SleepyWorker {
1420        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1421            tokio::time::sleep(self.duration).await;
1422            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1423        }
1424
1425        fn exported_functions(&self) -> &[FunctionMetadata] {
1426            &self.exported
1427        }
1428    }
1429
1430    #[tokio::test]
1431    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1432        set_up();
1433        let sim_clock = SimClock::default();
1434        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1435        let lock_expiry = Duration::from_millis(100);
1436        let timeout_duration = Duration::from_millis(300);
1437        let retry_config = ComponentRetryConfig {
1438            max_retries: 1,
1439            retry_exp_backoff: timeout_duration,
1440        };
1441        let exec_config = ExecConfig {
1442            batch_size: 1,
1443            lock_expiry,
1444            tick_sleep: Duration::ZERO,
1445            component_id: ComponentId::dummy_activity(),
1446            task_limiter: None,
1447            executor_id: ExecutorId::generate(),
1448            retry_config,
1449        };
1450
1451        let worker = Arc::new(SleepyWorker {
1452            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1453            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1454            exported: [FunctionMetadata {
1455                ffqn: FFQN_SOME,
1456                parameter_types: ParameterTypes::default(),
1457                return_type: RETURN_TYPE_DUMMY,
1458                extension: None,
1459                submittable: true,
1460            }],
1461        });
1462        // Create an execution
1463        let execution_id = ExecutionId::generate();
1464        let db_connection = db_pool.connection();
1465        db_connection
1466            .create(CreateRequest {
1467                created_at: sim_clock.now(),
1468                execution_id: execution_id.clone(),
1469                ffqn: FFQN_SOME,
1470                params: Params::empty(),
1471                parent: None,
1472                metadata: concepts::ExecutionMetadata::empty(),
1473                scheduled_at: sim_clock.now(),
1474                component_id: ComponentId::dummy_activity(),
1475                scheduled_by: None,
1476            })
1477            .await
1478            .unwrap();
1479
1480        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1481        let executor = ExecTask::new_test(
1482            worker,
1483            exec_config.clone(),
1484            sim_clock.clone(),
1485            db_exec.clone(),
1486            ffqns,
1487        );
1488        let mut first_execution_progress = executor
1489            .tick(sim_clock.now(), RunId::generate())
1490            .await
1491            .unwrap();
1492        assert_eq!(1, first_execution_progress.executions.len());
1493        // Started hanging, wait for lock expiry.
1494        sim_clock.move_time_forward(lock_expiry);
1495        // cleanup should be called
1496        let now_after_first_lock_expiry = sim_clock.now();
1497        {
1498            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1499            let cleanup_progress = executor
1500                .tick(now_after_first_lock_expiry, RunId::generate())
1501                .await
1502                .unwrap();
1503            assert!(cleanup_progress.executions.is_empty());
1504        }
1505        {
1506            let expired_locks = expired_timers_watcher::tick(
1507                db_pool.connection().as_ref(),
1508                now_after_first_lock_expiry,
1509            )
1510            .await
1511            .unwrap()
1512            .expired_locks;
1513            assert_eq!(1, expired_locks);
1514        }
1515        assert!(
1516            !first_execution_progress
1517                .executions
1518                .pop()
1519                .unwrap()
1520                .1
1521                .is_finished()
1522        );
1523
1524        let execution_log = db_connection.get(&execution_id).await.unwrap();
1525        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1526        assert_matches!(
1527            &execution_log.events.get(2).unwrap(),
1528            ExecutionEvent {
1529                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1530                created_at: at,
1531                backtrace_id: None,
1532            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1533        );
1534        assert_matches!(
1535            execution_log.pending_state,
1536            PendingState::PendingAt {
1537                scheduled_at: found_scheduled_by,
1538                last_lock: Some(LockedBy {
1539                    executor_id: found_executor_id,
1540                    run_id: _,
1541                }),
1542            } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1543        );
1544        sim_clock.move_time_forward(timeout_duration);
1545        let now_after_first_timeout = sim_clock.now();
1546        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1547
1548        let mut second_execution_progress = executor
1549            .tick(now_after_first_timeout, RunId::generate())
1550            .await
1551            .unwrap();
1552        assert_eq!(1, second_execution_progress.executions.len());
1553
1554        // Started hanging, wait for lock expiry.
1555        sim_clock.move_time_forward(lock_expiry);
1556        // cleanup should be called
1557        let now_after_second_lock_expiry = sim_clock.now();
1558        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1559        {
1560            let cleanup_progress = executor
1561                .tick(now_after_second_lock_expiry, RunId::generate())
1562                .await
1563                .unwrap();
1564            assert!(cleanup_progress.executions.is_empty());
1565        }
1566        {
1567            let expired_locks = expired_timers_watcher::tick(
1568                db_pool.connection().as_ref(),
1569                now_after_second_lock_expiry,
1570            )
1571            .await
1572            .unwrap()
1573            .expired_locks;
1574            assert_eq!(1, expired_locks);
1575        }
1576        assert!(
1577            !second_execution_progress
1578                .executions
1579                .pop()
1580                .unwrap()
1581                .1
1582                .is_finished()
1583        );
1584
1585        drop(db_connection);
1586        drop(executor);
1587        db_close.close().await;
1588    }
1589}