obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
8    LockedExecution,
9};
10use concepts::time::{ClockFn, Sleep};
11use concepts::{
12    ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
13};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16    FinishedExecutionError,
17    storage::{ExecutionEventInner, JoinSetResponse, Version},
18};
19use concepts::{JoinSetId, PermanentFailureKind};
20use std::fmt::Display;
21use std::{
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26    time::Duration,
27};
28use tokio::task::{AbortHandle, JoinHandle};
29use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
30
31#[derive(Debug, Clone)]
32pub struct ExecConfig {
33    pub lock_expiry: Duration,
34    pub tick_sleep: Duration,
35    pub batch_size: u32,
36    pub component_id: ComponentId,
37    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
38    pub executor_id: ExecutorId,
39    pub retry_config: ComponentRetryConfig,
40}
41
42pub struct ExecTask<C: ClockFn> {
43    worker: Arc<dyn Worker>,
44    config: ExecConfig,
45    clock_fn: C, // Used for obtaining current time when the execution finishes.
46    db_exec: Arc<dyn DbExecutor>,
47    ffqns: Arc<[FunctionFqn]>,
48}
49
50#[derive(derive_more::Debug, Default)]
51pub struct ExecutionProgress {
52    #[debug(skip)]
53    #[allow(dead_code)]
54    executions: Vec<(ExecutionId, JoinHandle<()>)>,
55}
56
57impl ExecutionProgress {
58    #[cfg(feature = "test")]
59    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
60        let mut vec = Vec::new();
61        for (exe, join_handle) in self.executions {
62            vec.push(exe);
63            join_handle.await.unwrap();
64        }
65        vec
66    }
67}
68
69#[derive(derive_more::Debug)]
70pub struct ExecutorTaskHandle {
71    #[debug(skip)]
72    is_closing: Arc<AtomicBool>,
73    #[debug(skip)]
74    abort_handle: AbortHandle,
75    component_id: ComponentId,
76    executor_id: ExecutorId,
77}
78
79impl ExecutorTaskHandle {
80    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
81    pub async fn close(&self) {
82        trace!("Gracefully closing");
83        // TODO: Close all the workers tasks as well.
84        // Simple solution:
85        // Attempt to unlock all in-progress executions. Activities will not be penalized for shutdown, although the number of retries would increase,
86        // some progress will be lost.
87        // More complex solution:
88        // Activities will be awaited, up to some deadline, then unlocked.
89        // Workflows that are awating should be signaled using `DeadlineTracker`.
90        self.is_closing.store(true, Ordering::Relaxed);
91        while !self.abort_handle.is_finished() {
92            tokio::time::sleep(Duration::from_millis(1)).await;
93        }
94        debug!("Gracefully closed");
95    }
96}
97
98impl Drop for ExecutorTaskHandle {
99    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
100    fn drop(&mut self) {
101        if self.abort_handle.is_finished() {
102            return;
103        }
104        warn!("Aborting the executor task");
105        self.abort_handle.abort();
106    }
107}
108
109#[cfg(feature = "test")]
110pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
111    extract_exported_ffqns_noext(worker)
112}
113
114fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
115    worker
116        .exported_functions()
117        .iter()
118        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
119        .collect::<Arc<_>>()
120}
121
122impl<C: ClockFn + 'static> ExecTask<C> {
123    #[cfg(feature = "test")]
124    pub fn new_test(
125        worker: Arc<dyn Worker>,
126        config: ExecConfig,
127        clock_fn: C,
128        db_exec: Arc<dyn DbExecutor>,
129        ffqns: Arc<[FunctionFqn]>,
130    ) -> Self {
131        Self {
132            worker,
133            config,
134            clock_fn,
135            db_exec,
136            ffqns,
137        }
138    }
139
140    #[cfg(feature = "test")]
141    pub fn new_all_ffqns_test(
142        worker: Arc<dyn Worker>,
143        config: ExecConfig,
144        clock_fn: C,
145        db_exec: Arc<dyn DbExecutor>,
146    ) -> Self {
147        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
148        Self {
149            worker,
150            config,
151            clock_fn,
152            db_exec,
153            ffqns,
154        }
155    }
156
157    pub fn spawn_new(
158        worker: Arc<dyn Worker>,
159        config: ExecConfig,
160        clock_fn: C,
161        db_exec: Arc<dyn DbExecutor>,
162        sleep: impl Sleep + 'static,
163    ) -> ExecutorTaskHandle {
164        let is_closing = Arc::new(AtomicBool::default());
165        let is_closing_inner = is_closing.clone();
166        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167        let component_id = config.component_id.clone();
168        let executor_id = config.executor_id;
169        let abort_handle = tokio::spawn(async move {
170            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
171            let task = ExecTask {
172                worker,
173                config,
174                db_exec,
175                ffqns: ffqns.clone(),
176                clock_fn: clock_fn.clone(),
177            };
178            while !is_closing_inner.load(Ordering::Relaxed) {
179                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
180
181                task.db_exec
182                    .wait_for_pending(clock_fn.now(), ffqns.clone(), {
183                        let sleep = sleep.clone();
184                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
185                    .await;
186            }
187        })
188        .abort_handle();
189        ExecutorTaskHandle {
190            is_closing,
191            abort_handle,
192            component_id,
193            executor_id,
194        }
195    }
196
197    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
198        if let Some(task_limiter) = &self.config.task_limiter {
199            let mut locks = Vec::new();
200            for _ in 0..self.config.batch_size {
201                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
202                    locks.push(Some(permit));
203                } else {
204                    break;
205                }
206            }
207            locks
208        } else {
209            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
210            for _ in 0..self.config.batch_size {
211                vec.push(None);
212            }
213            vec
214        }
215    }
216
217    #[cfg(feature = "test")]
218    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
219        self.tick(executed_at, run_id).await.unwrap()
220    }
221
222    #[cfg(feature = "test")]
223    pub async fn tick_test_await(
224        &self,
225        executed_at: DateTime<Utc>,
226        run_id: RunId,
227    ) -> Vec<ExecutionId> {
228        self.tick(executed_at, run_id)
229            .await
230            .unwrap()
231            .wait_for_tasks()
232            .await
233    }
234
235    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
236    async fn tick(
237        &self,
238        executed_at: DateTime<Utc>,
239        run_id: RunId,
240    ) -> Result<ExecutionProgress, DbErrorGeneric> {
241        let locked_executions = {
242            let mut permits = self.acquire_task_permits();
243            if permits.is_empty() {
244                return Ok(ExecutionProgress::default());
245            }
246            let lock_expires_at = executed_at + self.config.lock_expiry;
247            let locked_executions = self
248                .db_exec
249                .lock_pending(
250                    permits.len(), // batch size
251                    executed_at,   // fetch expiring before now
252                    self.ffqns.clone(),
253                    executed_at, // created at
254                    self.config.component_id.clone(),
255                    self.config.executor_id,
256                    lock_expires_at,
257                    run_id,
258                    self.config.retry_config,
259                )
260                .await?;
261            // Drop permits if too many were allocated.
262            while permits.len() > locked_executions.len() {
263                permits.pop();
264            }
265            assert_eq!(permits.len(), locked_executions.len());
266            locked_executions.into_iter().zip(permits)
267        };
268        let execution_deadline = executed_at + self.config.lock_expiry;
269
270        let mut executions = Vec::with_capacity(locked_executions.len());
271        for (locked_execution, permit) in locked_executions {
272            let execution_id = locked_execution.execution_id.clone();
273            let join_handle = {
274                let worker = self.worker.clone();
275                let db = self.db_exec.clone();
276                let clock_fn = self.clock_fn.clone();
277                let run_id = locked_execution.run_id;
278                let worker_span = info_span!(parent: None, "worker",
279                    "otel.name" = format!("worker {}", locked_execution.ffqn),
280                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
281                locked_execution.metadata.enrich(&worker_span);
282                tokio::spawn({
283                    let worker_span2 = worker_span.clone();
284                    let retry_config = self.config.retry_config;
285                    async move {
286                        let _permit = permit;
287                        let res = Self::run_worker(
288                            worker,
289                            db.as_ref(),
290                            execution_deadline,
291                            clock_fn,
292                            locked_execution,
293                            retry_config,
294                            worker_span2,
295                        )
296                        .await;
297                        if let Err(db_error) = res {
298                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
299                        }
300                    }
301                    .instrument(worker_span)
302                })
303            };
304            executions.push((execution_id, join_handle));
305        }
306        Ok(ExecutionProgress { executions })
307    }
308
309    async fn run_worker(
310        worker: Arc<dyn Worker>,
311        db_exec: &dyn DbExecutor,
312        execution_deadline: DateTime<Utc>,
313        clock_fn: C,
314        locked_execution: LockedExecution,
315        retry_config: ComponentRetryConfig,
316        worker_span: Span,
317    ) -> Result<(), DbErrorWrite> {
318        debug!("Worker::run starting");
319        trace!(
320            version = %locked_execution.next_version,
321            params = ?locked_execution.params,
322            event_history = ?locked_execution.event_history,
323            "Worker::run starting"
324        );
325        let can_be_retried = ExecutionLog::can_be_retried_after(
326            locked_execution.intermittent_event_count + 1,
327            retry_config.max_retries,
328            retry_config.retry_exp_backoff,
329        );
330        let unlock_expiry_on_limit_reached =
331            ExecutionLog::compute_retry_duration_when_retrying_forever(
332                locked_execution.intermittent_event_count + 1,
333                retry_config.retry_exp_backoff,
334            );
335        let ctx = WorkerContext {
336            execution_id: locked_execution.execution_id.clone(),
337            metadata: locked_execution.metadata,
338            ffqn: locked_execution.ffqn,
339            params: locked_execution.params,
340            event_history: locked_execution.event_history,
341            responses: locked_execution
342                .responses
343                .into_iter()
344                .map(|outer| outer.event)
345                .collect(),
346            version: locked_execution.next_version,
347            execution_deadline,
348            can_be_retried: can_be_retried.is_some(),
349            run_id: locked_execution.run_id,
350            worker_span,
351        };
352        let worker_result = worker.run(ctx).await;
353        trace!(?worker_result, "Worker::run finished");
354        let result_obtained_at = clock_fn.now();
355        match Self::worker_result_to_execution_event(
356            locked_execution.execution_id,
357            worker_result,
358            result_obtained_at,
359            locked_execution.parent,
360            can_be_retried,
361            unlock_expiry_on_limit_reached,
362        )? {
363            Some(append) => {
364                trace!("Appending {append:?}");
365                append.append(db_exec).await
366            }
367            None => Ok(()),
368        }
369    }
370
371    /// Map the `WorkerError` to an temporary or a permanent failure.
372    fn worker_result_to_execution_event(
373        execution_id: ExecutionId,
374        worker_result: WorkerResult,
375        result_obtained_at: DateTime<Utc>,
376        parent: Option<(ExecutionId, JoinSetId)>,
377        can_be_retried: Option<Duration>,
378        unlock_expiry_on_limit_reached: Duration,
379    ) -> Result<Option<Append>, DbErrorWrite> {
380        Ok(match worker_result {
381            WorkerResult::Ok(result, new_version, http_client_traces) => {
382                info!(
383                    "Execution finished: {}",
384                    result.as_pending_state_finished_result()
385                );
386                let child_finished =
387                    parent.map(
388                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
389                            parent_execution_id,
390                            parent_join_set,
391                            result: result.clone(),
392                        },
393                    );
394                let primary_event = AppendRequest {
395                    created_at: result_obtained_at,
396                    event: ExecutionEventInner::Finished {
397                        result,
398                        http_client_traces,
399                    },
400                };
401
402                Some(Append {
403                    created_at: result_obtained_at,
404                    primary_event,
405                    execution_id,
406                    version: new_version,
407                    child_finished,
408                })
409            }
410            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
411            WorkerResult::Err(err) => {
412                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
413                let (primary_event, child_finished, version) = match err {
414                    WorkerError::TemporaryTimeout {
415                        http_client_traces,
416                        version,
417                    } => {
418                        if let Some(duration) = can_be_retried {
419                            let backoff_expires_at = result_obtained_at + duration;
420                            info!(
421                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
422                            );
423                            (
424                                ExecutionEventInner::TemporarilyTimedOut {
425                                    backoff_expires_at,
426                                    http_client_traces,
427                                },
428                                None,
429                                version,
430                            )
431                        } else {
432                            info!("Permanent timeout");
433                            let result = SupportedFunctionReturnValue::ExecutionError(
434                                FinishedExecutionError::PermanentTimeout,
435                            );
436                            let child_finished =
437                                parent.map(|(parent_execution_id, parent_join_set)| {
438                                    ChildFinishedResponse {
439                                        parent_execution_id,
440                                        parent_join_set,
441                                        result: result.clone(),
442                                    }
443                                });
444                            (
445                                ExecutionEventInner::Finished {
446                                    result,
447                                    http_client_traces,
448                                },
449                                child_finished,
450                                version,
451                            )
452                        }
453                    }
454                    WorkerError::DbError(db_error) => {
455                        return Err(db_error);
456                    }
457                    WorkerError::ActivityTrap {
458                        reason: reason_inner,
459                        trap_kind,
460                        detail,
461                        version,
462                        http_client_traces,
463                    } => retry_or_fail(
464                        result_obtained_at,
465                        parent,
466                        can_be_retried,
467                        reason_full,
468                        reason_inner,
469                        trap_kind, // short reason like `trap` for logs
470                        detail,
471                        version,
472                        http_client_traces,
473                    ),
474                    WorkerError::ActivityPreopenedDirError {
475                        reason_kind,
476                        reason_inner,
477                        version,
478                    } => retry_or_fail(
479                        result_obtained_at,
480                        parent,
481                        can_be_retried,
482                        reason_full,
483                        reason_inner,
484                        reason_kind, // short reason for logs
485                        None,        // detail
486                        version,
487                        None, // http_client_traces
488                    ),
489                    WorkerError::ActivityReturnedError {
490                        detail,
491                        version,
492                        http_client_traces,
493                    } => {
494                        let duration = can_be_retried.expect(
495                            "ActivityReturnedError must not be returned when retries are exhausted",
496                        );
497                        let expires_at = result_obtained_at + duration;
498                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
499                        (
500                            ExecutionEventInner::TemporarilyFailed {
501                                backoff_expires_at: expires_at,
502                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
503                                reason_inner: StrVariant::Static("activity returned error"),
504                                detail, // contains the backtrace
505                                http_client_traces,
506                            },
507                            None,
508                            version,
509                        )
510                    }
511                    WorkerError::LimitReached {
512                        reason: inner_reason,
513                        version: new_version,
514                    } => {
515                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
516                        warn!(
517                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
518                        );
519                        (
520                            ExecutionEventInner::Unlocked {
521                                backoff_expires_at: expires_at,
522                                reason: StrVariant::from(reason_full),
523                            },
524                            None,
525                            new_version,
526                        )
527                    }
528                    WorkerError::FatalError(fatal_error, version) => {
529                        warn!("Fatal worker error - {fatal_error:?}");
530                        let result = SupportedFunctionReturnValue::ExecutionError(
531                            FinishedExecutionError::from(fatal_error),
532                        );
533                        let child_finished =
534                            parent.map(|(parent_execution_id, parent_join_set)| {
535                                ChildFinishedResponse {
536                                    parent_execution_id,
537                                    parent_join_set,
538                                    result: result.clone(),
539                                }
540                            });
541                        (
542                            ExecutionEventInner::Finished {
543                                result,
544                                http_client_traces: None,
545                            },
546                            child_finished,
547                            version,
548                        )
549                    }
550                };
551                Some(Append {
552                    created_at: result_obtained_at,
553                    primary_event: AppendRequest {
554                        created_at: result_obtained_at,
555                        event: primary_event,
556                    },
557                    execution_id,
558                    version,
559                    child_finished,
560                })
561            }
562        })
563    }
564}
565
566#[expect(clippy::too_many_arguments)]
567fn retry_or_fail(
568    result_obtained_at: DateTime<Utc>,
569    parent: Option<(ExecutionId, JoinSetId)>,
570    can_be_retried: Option<Duration>,
571    reason_full: String,
572    reason_inner: String,
573    err_kind_for_logs: impl Display,
574    detail: Option<String>,
575    version: Version,
576    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
577) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
578    if let Some(duration) = can_be_retried {
579        let expires_at = result_obtained_at + duration;
580        debug!(
581            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
582        );
583        (
584            ExecutionEventInner::TemporarilyFailed {
585                backoff_expires_at: expires_at,
586                reason_full: StrVariant::from(reason_full),
587                reason_inner: StrVariant::from(reason_inner),
588                detail,
589                http_client_traces,
590            },
591            None,
592            version,
593        )
594    } else {
595        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
596        let result = SupportedFunctionReturnValue::ExecutionError(
597            FinishedExecutionError::PermanentFailure {
598                reason_inner,
599                reason_full,
600                kind: PermanentFailureKind::ActivityTrap,
601                detail,
602            },
603        );
604
605        let child_finished =
606            parent.map(
607                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
608                    parent_execution_id,
609                    parent_join_set,
610                    result: result.clone(),
611                },
612            );
613        (
614            ExecutionEventInner::Finished {
615                result,
616                http_client_traces,
617            },
618            child_finished,
619            version,
620        )
621    }
622}
623
624#[derive(Debug, Clone)]
625pub(crate) struct ChildFinishedResponse {
626    pub(crate) parent_execution_id: ExecutionId,
627    pub(crate) parent_join_set: JoinSetId,
628    pub(crate) result: SupportedFunctionReturnValue,
629}
630
631#[derive(Debug, Clone)]
632pub(crate) struct Append {
633    pub(crate) created_at: DateTime<Utc>,
634    pub(crate) primary_event: AppendRequest,
635    pub(crate) execution_id: ExecutionId,
636    pub(crate) version: Version,
637    pub(crate) child_finished: Option<ChildFinishedResponse>,
638}
639
640impl Append {
641    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
642        if let Some(child_finished) = self.child_finished {
643            assert_matches!(
644                &self.primary_event,
645                AppendRequest {
646                    event: ExecutionEventInner::Finished { .. },
647                    ..
648                }
649            );
650            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
651            let events = AppendEventsToExecution {
652                execution_id: self.execution_id,
653                version: self.version.clone(),
654                batch: vec![self.primary_event],
655            };
656            let response = AppendResponseToExecution {
657                parent_execution_id: child_finished.parent_execution_id,
658                parent_response_event: JoinSetResponseEventOuter {
659                    created_at: self.created_at,
660                    event: JoinSetResponseEvent {
661                        join_set_id: child_finished.parent_join_set,
662                        event: JoinSetResponse::ChildExecutionFinished {
663                            child_execution_id: derived,
664                            // Since self.primary_event is a finished event, the version will remain the same.
665                            finished_version: self.version,
666                            result: child_finished.result,
667                        },
668                    },
669                },
670            };
671
672            db_exec
673                .append_batch_respond_to_parent(events, response, self.created_at)
674                .await?;
675        } else {
676            db_exec
677                .append(self.execution_id, self.version, self.primary_event)
678                .await?;
679        }
680        Ok(())
681    }
682}
683
684#[cfg(any(test, feature = "test"))]
685pub mod simple_worker {
686    use crate::worker::{Worker, WorkerContext, WorkerResult};
687    use async_trait::async_trait;
688    use concepts::{
689        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
690        storage::{HistoryEvent, Version},
691    };
692    use indexmap::IndexMap;
693    use std::sync::Arc;
694    use tracing::trace;
695
696    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
697    pub type SimpleWorkerResultMap =
698        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
699
700    #[derive(Clone, Debug)]
701    pub struct SimpleWorker {
702        pub worker_results_rev: SimpleWorkerResultMap,
703        pub ffqn: FunctionFqn,
704        exported: [FunctionMetadata; 1],
705    }
706
707    impl SimpleWorker {
708        #[must_use]
709        pub fn with_single_result(res: WorkerResult) -> Self {
710            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
711                Version::new(2),
712                (vec![], res),
713            )]))))
714        }
715
716        #[must_use]
717        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
718            Self {
719                worker_results_rev: self.worker_results_rev,
720                exported: [FunctionMetadata {
721                    ffqn: ffqn.clone(),
722                    parameter_types: ParameterTypes::default(),
723                    return_type: RETURN_TYPE_DUMMY,
724                    extension: None,
725                    submittable: true,
726                }],
727                ffqn,
728            }
729        }
730
731        #[must_use]
732        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
733            Self {
734                worker_results_rev,
735                ffqn: FFQN_SOME,
736                exported: [FunctionMetadata {
737                    ffqn: FFQN_SOME,
738                    parameter_types: ParameterTypes::default(),
739                    return_type: RETURN_TYPE_DUMMY,
740                    extension: None,
741                    submittable: true,
742                }],
743            }
744        }
745    }
746
747    #[async_trait]
748    impl Worker for SimpleWorker {
749        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
750            let (expected_version, (expected_eh, worker_result)) =
751                self.worker_results_rev.lock().unwrap().pop().unwrap();
752            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
753            assert_eq!(expected_version, ctx.version);
754            assert_eq!(expected_eh, ctx.event_history);
755            worker_result
756        }
757
758        fn exported_functions(&self) -> &[FunctionMetadata] {
759            &self.exported
760        }
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use self::simple_worker::SimpleWorker;
767    use super::*;
768    use crate::{expired_timers_watcher, worker::WorkerResult};
769    use assert_matches::assert_matches;
770    use async_trait::async_trait;
771    use concepts::storage::DbPoolCloseable;
772    use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
773    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
774    use concepts::time::Now;
775    use concepts::{
776        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
777        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
778    };
779    use db_tests::Database;
780    use indexmap::IndexMap;
781    use simple_worker::FFQN_SOME;
782    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
783    use test_utils::set_up;
784    use test_utils::sim_clock::{ConstClock, SimClock};
785
786    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
787
788    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
789        config: ExecConfig,
790        clock_fn: C,
791        db_exec: Arc<dyn DbExecutor>,
792        worker: Arc<W>,
793        executed_at: DateTime<Utc>,
794    ) -> Vec<ExecutionId> {
795        trace!("Ticking with {worker:?}");
796        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
797        let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
798        executor
799            .tick_test_await(executed_at, RunId::generate())
800            .await
801    }
802
803    #[tokio::test]
804    async fn execute_simple_lifecycle_tick_based_mem() {
805        let created_at = Now.now();
806        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
807        execute_simple_lifecycle_tick_based(
808            db_pool.connection().as_ref(),
809            db_exec.clone(),
810            ConstClock(created_at),
811        )
812        .await;
813        db_close.close().await;
814    }
815
816    #[tokio::test]
817    async fn execute_simple_lifecycle_tick_based_sqlite() {
818        let created_at = Now.now();
819        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
820        execute_simple_lifecycle_tick_based(
821            db_pool.connection().as_ref(),
822            db_exec.clone(),
823            ConstClock(created_at),
824        )
825        .await;
826        db_close.close().await;
827    }
828
829    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
830        db_connection: &dyn DbConnection,
831        db_exec: Arc<dyn DbExecutor>,
832        clock_fn: C,
833    ) {
834        set_up();
835        let created_at = clock_fn.now();
836        let exec_config = ExecConfig {
837            batch_size: 1,
838            lock_expiry: Duration::from_secs(1),
839            tick_sleep: Duration::from_millis(100),
840            component_id: ComponentId::dummy_activity(),
841            task_limiter: None,
842            executor_id: ExecutorId::generate(),
843            retry_config: ComponentRetryConfig::ZERO,
844        };
845
846        let execution_log = create_and_tick(
847            CreateAndTickConfig {
848                execution_id: ExecutionId::generate(),
849                created_at,
850                executed_at: created_at,
851            },
852            clock_fn,
853            db_connection,
854            db_exec,
855            exec_config,
856            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
857                SUPPORTED_RETURN_VALUE_OK_EMPTY,
858                Version::new(2),
859                None,
860            ))),
861            tick_fn,
862        )
863        .await;
864        assert_matches!(
865            execution_log.events.get(2).unwrap(),
866            ExecutionEvent {
867                event: ExecutionEventInner::Finished {
868                    result: SupportedFunctionReturnValue::Ok { ok: None },
869                    http_client_traces: None
870                },
871                created_at: _,
872                backtrace_id: None,
873            }
874        );
875    }
876
877    #[tokio::test]
878    async fn execute_simple_lifecycle_task_based_mem() {
879        set_up();
880        let created_at = Now.now();
881        let clock_fn = ConstClock(created_at);
882        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
883        let exec_config = ExecConfig {
884            batch_size: 1,
885            lock_expiry: Duration::from_secs(1),
886            tick_sleep: Duration::ZERO,
887            component_id: ComponentId::dummy_activity(),
888            task_limiter: None,
889            executor_id: ExecutorId::generate(),
890            retry_config: ComponentRetryConfig::ZERO,
891        };
892
893        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
894            SUPPORTED_RETURN_VALUE_OK_EMPTY,
895            Version::new(2),
896            None,
897        )));
898
899        let execution_log = create_and_tick(
900            CreateAndTickConfig {
901                execution_id: ExecutionId::generate(),
902                created_at,
903                executed_at: created_at,
904            },
905            clock_fn,
906            db_pool.connection().as_ref(),
907            db_exec,
908            exec_config,
909            worker,
910            tick_fn,
911        )
912        .await;
913        assert_matches!(
914            execution_log.events.get(2).unwrap(),
915            ExecutionEvent {
916                event: ExecutionEventInner::Finished {
917                    result: SupportedFunctionReturnValue::Ok { ok: None },
918                    http_client_traces: None
919                },
920                created_at: _,
921                backtrace_id: None,
922            }
923        );
924        db_close.close().await;
925    }
926
927    struct CreateAndTickConfig {
928        execution_id: ExecutionId,
929        created_at: DateTime<Utc>,
930        executed_at: DateTime<Utc>,
931    }
932
933    async fn create_and_tick<
934        W: Worker,
935        C: ClockFn,
936        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
937        F: Future<Output = Vec<ExecutionId>>,
938    >(
939        config: CreateAndTickConfig,
940        clock_fn: C,
941        db_connection: &dyn DbConnection,
942        db_exec: Arc<dyn DbExecutor>,
943        exec_config: ExecConfig,
944        worker: Arc<W>,
945        mut tick: T,
946    ) -> ExecutionLog {
947        // Create an execution
948        db_connection
949            .create(CreateRequest {
950                created_at: config.created_at,
951                execution_id: config.execution_id.clone(),
952                ffqn: FFQN_SOME,
953                params: Params::empty(),
954                parent: None,
955                metadata: concepts::ExecutionMetadata::empty(),
956                scheduled_at: config.created_at,
957                component_id: ComponentId::dummy_activity(),
958                scheduled_by: None,
959            })
960            .await
961            .unwrap();
962        // execute!
963        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
964        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
965        debug!("Execution history after tick: {execution_log:?}");
966        // check that DB contains Created and Locked events.
967        assert_matches!(
968            execution_log.events.first().unwrap(),
969            ExecutionEvent {
970                event: ExecutionEventInner::Created { .. },
971                created_at: actually_created_at,
972                backtrace_id: None,
973            }
974            if config.created_at == *actually_created_at
975        );
976        let locked_at = assert_matches!(
977            execution_log.events.get(1).unwrap(),
978            ExecutionEvent {
979                event: ExecutionEventInner::Locked { .. },
980                created_at: locked_at,
981                backtrace_id: None,
982            } if config.created_at <= *locked_at
983            => *locked_at
984        );
985        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
986            event: _,
987            created_at: executed_at,
988            backtrace_id: None,
989        } if *executed_at >= locked_at);
990        execution_log
991    }
992
993    #[tokio::test]
994    async fn activity_trap_should_trigger_an_execution_retry() {
995        set_up();
996        let sim_clock = SimClock::default();
997        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
998        let retry_exp_backoff = Duration::from_millis(100);
999        let retry_config = ComponentRetryConfig {
1000            max_retries: 1,
1001            retry_exp_backoff,
1002        };
1003        let exec_config = ExecConfig {
1004            batch_size: 1,
1005            lock_expiry: Duration::from_secs(1),
1006            tick_sleep: Duration::ZERO,
1007            component_id: ComponentId::dummy_activity(),
1008            task_limiter: None,
1009            executor_id: ExecutorId::generate(),
1010            retry_config,
1011        };
1012        let expected_reason = "error reason";
1013        let expected_detail = "error detail";
1014        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1015            WorkerError::ActivityTrap {
1016                reason: expected_reason.to_string(),
1017                trap_kind: concepts::TrapKind::Trap,
1018                detail: Some(expected_detail.to_string()),
1019                version: Version::new(2),
1020                http_client_traces: None,
1021            },
1022        )));
1023        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1024        let execution_log = create_and_tick(
1025            CreateAndTickConfig {
1026                execution_id: ExecutionId::generate(),
1027                created_at: sim_clock.now(),
1028                executed_at: sim_clock.now(),
1029            },
1030            sim_clock.clone(),
1031            db_pool.connection().as_ref(),
1032            db_exec.clone(),
1033            exec_config.clone(),
1034            worker,
1035            tick_fn,
1036        )
1037        .await;
1038        assert_eq!(3, execution_log.events.len());
1039        {
1040            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1041                &execution_log.events.get(2).unwrap(),
1042                ExecutionEvent {
1043                    event: ExecutionEventInner::TemporarilyFailed {
1044                        reason_inner,
1045                        reason_full,
1046                        detail,
1047                        backoff_expires_at,
1048                        http_client_traces: None,
1049                    },
1050                    created_at: at,
1051                    backtrace_id: None,
1052                }
1053                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1054            );
1055            assert_eq!(expected_reason, reason_inner.deref());
1056            assert_eq!(
1057                format!("activity trap: {expected_reason}"),
1058                reason_full.deref()
1059            );
1060            assert_eq!(Some(expected_detail), detail.as_deref());
1061            assert_eq!(at, sim_clock.now());
1062            assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1063        }
1064        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1065            std::sync::Mutex::new(IndexMap::from([(
1066                Version::new(4),
1067                (
1068                    vec![],
1069                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1070                ),
1071            )])),
1072        )));
1073        // noop until `retry_exp_backoff` expires
1074        assert!(
1075            tick_fn(
1076                exec_config.clone(),
1077                sim_clock.clone(),
1078                db_exec.clone(),
1079                worker.clone(),
1080                sim_clock.now(),
1081            )
1082            .await
1083            .is_empty()
1084        );
1085        // tick again to finish the execution
1086        sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1087        tick_fn(
1088            exec_config,
1089            sim_clock.clone(),
1090            db_exec.clone(),
1091            worker,
1092            sim_clock.now(),
1093        )
1094        .await;
1095        let execution_log = {
1096            let db_connection = db_pool.connection();
1097            db_connection
1098                .get(&execution_log.execution_id)
1099                .await
1100                .unwrap()
1101        };
1102        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1103        assert_matches!(
1104            execution_log.events.get(3).unwrap(),
1105            ExecutionEvent {
1106                event: ExecutionEventInner::Locked { .. },
1107                created_at: at,
1108                backtrace_id: None,
1109            } if *at == sim_clock.now()
1110        );
1111        assert_matches!(
1112            execution_log.events.get(4).unwrap(),
1113            ExecutionEvent {
1114                event: ExecutionEventInner::Finished {
1115                    result: SupportedFunctionReturnValue::Ok{ok:None},
1116                    http_client_traces: None
1117                },
1118                created_at: finished_at,
1119                backtrace_id: None,
1120            } if *finished_at == sim_clock.now()
1121        );
1122        db_close.close().await;
1123    }
1124
1125    #[tokio::test]
1126    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1127        set_up();
1128        let created_at = Now.now();
1129        let clock_fn = ConstClock(created_at);
1130        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1131        let exec_config = ExecConfig {
1132            batch_size: 1,
1133            lock_expiry: Duration::from_secs(1),
1134            tick_sleep: Duration::ZERO,
1135            component_id: ComponentId::dummy_activity(),
1136            task_limiter: None,
1137            executor_id: ExecutorId::generate(),
1138            retry_config: ComponentRetryConfig::ZERO,
1139        };
1140
1141        let expected_reason = "error reason";
1142        let expected_detail = "error detail";
1143        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1144            WorkerError::ActivityTrap {
1145                reason: expected_reason.to_string(),
1146                trap_kind: concepts::TrapKind::Trap,
1147                detail: Some(expected_detail.to_string()),
1148                version: Version::new(2),
1149                http_client_traces: None,
1150            },
1151        )));
1152        let execution_log = create_and_tick(
1153            CreateAndTickConfig {
1154                execution_id: ExecutionId::generate(),
1155                created_at,
1156                executed_at: created_at,
1157            },
1158            clock_fn,
1159            db_pool.connection().as_ref(),
1160            db_exec,
1161            exec_config.clone(),
1162            worker,
1163            tick_fn,
1164        )
1165        .await;
1166        assert_eq!(3, execution_log.events.len());
1167        let (reason, kind, detail) = assert_matches!(
1168            &execution_log.events.get(2).unwrap(),
1169            ExecutionEvent {
1170                event: ExecutionEventInner::Finished{
1171                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1172                    http_client_traces: None
1173                },
1174                created_at: at,
1175                backtrace_id: None,
1176            } if *at == created_at
1177            => (reason_inner, kind, detail)
1178        );
1179        assert_eq!(expected_reason, *reason);
1180        assert_eq!(Some(expected_detail), detail.as_deref());
1181        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1182
1183        db_close.close().await;
1184    }
1185
1186    #[tokio::test]
1187    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1188        let worker_error = WorkerError::ActivityTrap {
1189            reason: "error reason".to_string(),
1190            trap_kind: TrapKind::Trap,
1191            detail: Some("detail".to_string()),
1192            version: Version::new(2),
1193            http_client_traces: None,
1194        };
1195        let expected_child_err = FinishedExecutionError::PermanentFailure {
1196            reason_full: "activity trap: error reason".to_string(),
1197            reason_inner: "error reason".to_string(),
1198            kind: PermanentFailureKind::ActivityTrap,
1199            detail: Some("detail".to_string()),
1200        };
1201        child_execution_permanently_failed_should_notify_parent(
1202            WorkerResult::Err(worker_error),
1203            expected_child_err,
1204        )
1205        .await;
1206    }
1207
1208    // TODO: Add test for WorkerError::TemporaryTimeout
1209
1210    #[tokio::test]
1211    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1212        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1213        child_execution_permanently_failed_should_notify_parent(
1214            WorkerResult::DbUpdatedByWorkerOrWatcher,
1215            expected_child_err,
1216        )
1217        .await;
1218    }
1219
1220    async fn child_execution_permanently_failed_should_notify_parent(
1221        worker_result: WorkerResult,
1222        expected_child_err: FinishedExecutionError,
1223    ) {
1224        use concepts::storage::JoinSetResponseEventOuter;
1225        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1226
1227        set_up();
1228        let sim_clock = SimClock::default();
1229        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1230
1231        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1232            WorkerResult::DbUpdatedByWorkerOrWatcher,
1233        ));
1234        let parent_execution_id = ExecutionId::generate();
1235        db_pool
1236            .connection()
1237            .create(CreateRequest {
1238                created_at: sim_clock.now(),
1239                execution_id: parent_execution_id.clone(),
1240                ffqn: FFQN_SOME,
1241                params: Params::empty(),
1242                parent: None,
1243                metadata: concepts::ExecutionMetadata::empty(),
1244                scheduled_at: sim_clock.now(),
1245                component_id: ComponentId::dummy_activity(),
1246                scheduled_by: None,
1247            })
1248            .await
1249            .unwrap();
1250        tick_fn(
1251            ExecConfig {
1252                batch_size: 1,
1253                lock_expiry: LOCK_EXPIRY,
1254                tick_sleep: Duration::ZERO,
1255                component_id: ComponentId::dummy_activity(),
1256                task_limiter: None,
1257                executor_id: ExecutorId::generate(),
1258                retry_config: ComponentRetryConfig::ZERO,
1259            },
1260            sim_clock.clone(),
1261            db_exec.clone(),
1262            parent_worker,
1263            sim_clock.now(),
1264        )
1265        .await;
1266
1267        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1268        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1269        // executor does not append anything, this should have been written by the worker:
1270        {
1271            let params = Params::empty();
1272            let child = CreateRequest {
1273                created_at: sim_clock.now(),
1274                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1275                ffqn: FFQN_CHILD,
1276                params: params.clone(),
1277                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1278                metadata: concepts::ExecutionMetadata::empty(),
1279                scheduled_at: sim_clock.now(),
1280                component_id: ComponentId::dummy_activity(),
1281                scheduled_by: None,
1282            };
1283            let current_time = sim_clock.now();
1284            let join_set = AppendRequest {
1285                created_at: current_time,
1286                event: ExecutionEventInner::HistoryEvent {
1287                    event: HistoryEvent::JoinSetCreate {
1288                        join_set_id: join_set_id.clone(),
1289                        closing_strategy: ClosingStrategy::Complete,
1290                    },
1291                },
1292            };
1293            let child_exec_req = AppendRequest {
1294                created_at: current_time,
1295                event: ExecutionEventInner::HistoryEvent {
1296                    event: HistoryEvent::JoinSetRequest {
1297                        join_set_id: join_set_id.clone(),
1298                        request: JoinSetRequest::ChildExecutionRequest {
1299                            child_execution_id: child_execution_id.clone(),
1300                            target_ffqn: FFQN_CHILD,
1301                            params,
1302                        },
1303                    },
1304                },
1305            };
1306            let join_next = AppendRequest {
1307                created_at: current_time,
1308                event: ExecutionEventInner::HistoryEvent {
1309                    event: HistoryEvent::JoinNext {
1310                        join_set_id: join_set_id.clone(),
1311                        run_expires_at: sim_clock.now(),
1312                        closing: false,
1313                        requested_ffqn: Some(FFQN_CHILD),
1314                    },
1315                },
1316            };
1317            db_pool
1318                .connection()
1319                .append_batch_create_new_execution(
1320                    current_time,
1321                    vec![join_set, child_exec_req, join_next],
1322                    parent_execution_id.clone(),
1323                    Version::new(2),
1324                    vec![child],
1325                )
1326                .await
1327                .unwrap();
1328        }
1329
1330        let child_worker =
1331            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1332
1333        // execute the child
1334        tick_fn(
1335            ExecConfig {
1336                batch_size: 1,
1337                lock_expiry: LOCK_EXPIRY,
1338                tick_sleep: Duration::ZERO,
1339                component_id: ComponentId::dummy_activity(),
1340                task_limiter: None,
1341                executor_id: ExecutorId::generate(),
1342                retry_config: ComponentRetryConfig::ZERO,
1343            },
1344            sim_clock.clone(),
1345            db_exec.clone(),
1346            child_worker,
1347            sim_clock.now(),
1348        )
1349        .await;
1350        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1351            // In case of timeout, let the timers watcher handle it
1352            sim_clock.move_time_forward(LOCK_EXPIRY);
1353            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1354                .await
1355                .unwrap();
1356        }
1357        let child_log = db_pool
1358            .connection()
1359            .get(&ExecutionId::Derived(child_execution_id.clone()))
1360            .await
1361            .unwrap();
1362        assert!(child_log.pending_state.is_finished());
1363        assert_eq!(
1364            Version(2),
1365            child_log.next_version,
1366            "created = 0, locked = 1, with_single_result = 2"
1367        );
1368        assert_eq!(
1369            ExecutionEventInner::Finished {
1370                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1371                http_client_traces: None
1372            },
1373            child_log.last_event().event
1374        );
1375        let parent_log = db_pool
1376            .connection()
1377            .get(&parent_execution_id)
1378            .await
1379            .unwrap();
1380        assert_matches!(
1381            parent_log.pending_state,
1382            PendingState::PendingAt {
1383                scheduled_at
1384            } if scheduled_at == sim_clock.now(),
1385            "parent should be back to pending"
1386        );
1387        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1388            parent_log.responses.last(),
1389            Some(JoinSetResponseEventOuter{
1390                created_at: at,
1391                event: JoinSetResponseEvent{
1392                    join_set_id: found_join_set_id,
1393                    event: JoinSetResponse::ChildExecutionFinished {
1394                        child_execution_id: found_child_execution_id,
1395                        finished_version,
1396                        result: found_result,
1397                    }
1398                }
1399            })
1400             if *at == sim_clock.now()
1401            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1402        );
1403        assert_eq!(join_set_id, *found_join_set_id);
1404        assert_eq!(child_execution_id, *found_child_execution_id);
1405        assert_eq!(child_log.next_version, *child_finished_version);
1406        assert_matches!(
1407            found_result,
1408            SupportedFunctionReturnValue::ExecutionError(_)
1409        );
1410
1411        db_close.close().await;
1412    }
1413
1414    #[derive(Clone, Debug)]
1415    struct SleepyWorker {
1416        duration: Duration,
1417        result: SupportedFunctionReturnValue,
1418        exported: [FunctionMetadata; 1],
1419    }
1420
1421    #[async_trait]
1422    impl Worker for SleepyWorker {
1423        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1424            tokio::time::sleep(self.duration).await;
1425            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1426        }
1427
1428        fn exported_functions(&self) -> &[FunctionMetadata] {
1429            &self.exported
1430        }
1431    }
1432
1433    #[tokio::test]
1434    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1435        set_up();
1436        let sim_clock = SimClock::default();
1437        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1438        let lock_expiry = Duration::from_millis(100);
1439        let timeout_duration = Duration::from_millis(300);
1440        let retry_config = ComponentRetryConfig {
1441            max_retries: 1,
1442            retry_exp_backoff: timeout_duration,
1443        };
1444        let exec_config = ExecConfig {
1445            batch_size: 1,
1446            lock_expiry,
1447            tick_sleep: Duration::ZERO,
1448            component_id: ComponentId::dummy_activity(),
1449            task_limiter: None,
1450            executor_id: ExecutorId::generate(),
1451            retry_config,
1452        };
1453
1454        let worker = Arc::new(SleepyWorker {
1455            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1456            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1457            exported: [FunctionMetadata {
1458                ffqn: FFQN_SOME,
1459                parameter_types: ParameterTypes::default(),
1460                return_type: RETURN_TYPE_DUMMY,
1461                extension: None,
1462                submittable: true,
1463            }],
1464        });
1465        // Create an execution
1466        let execution_id = ExecutionId::generate();
1467        let db_connection = db_pool.connection();
1468        db_connection
1469            .create(CreateRequest {
1470                created_at: sim_clock.now(),
1471                execution_id: execution_id.clone(),
1472                ffqn: FFQN_SOME,
1473                params: Params::empty(),
1474                parent: None,
1475                metadata: concepts::ExecutionMetadata::empty(),
1476                scheduled_at: sim_clock.now(),
1477                component_id: ComponentId::dummy_activity(),
1478                scheduled_by: None,
1479            })
1480            .await
1481            .unwrap();
1482
1483        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1484        let executor = ExecTask::new_test(
1485            worker,
1486            exec_config,
1487            sim_clock.clone(),
1488            db_exec.clone(),
1489            ffqns,
1490        );
1491        let mut first_execution_progress = executor
1492            .tick(sim_clock.now(), RunId::generate())
1493            .await
1494            .unwrap();
1495        assert_eq!(1, first_execution_progress.executions.len());
1496        // Started hanging, wait for lock expiry.
1497        sim_clock.move_time_forward(lock_expiry);
1498        // cleanup should be called
1499        let now_after_first_lock_expiry = sim_clock.now();
1500        {
1501            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1502            let cleanup_progress = executor
1503                .tick(now_after_first_lock_expiry, RunId::generate())
1504                .await
1505                .unwrap();
1506            assert!(cleanup_progress.executions.is_empty());
1507        }
1508        {
1509            let expired_locks = expired_timers_watcher::tick(
1510                db_pool.connection().as_ref(),
1511                now_after_first_lock_expiry,
1512            )
1513            .await
1514            .unwrap()
1515            .expired_locks;
1516            assert_eq!(1, expired_locks);
1517        }
1518        assert!(
1519            !first_execution_progress
1520                .executions
1521                .pop()
1522                .unwrap()
1523                .1
1524                .is_finished()
1525        );
1526
1527        let execution_log = db_connection.get(&execution_id).await.unwrap();
1528        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1529        assert_matches!(
1530            &execution_log.events.get(2).unwrap(),
1531            ExecutionEvent {
1532                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1533                created_at: at,
1534                backtrace_id: None,
1535            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1536        );
1537        assert_eq!(
1538            PendingState::PendingAt {
1539                scheduled_at: expected_first_timeout_expiry
1540            },
1541            execution_log.pending_state
1542        );
1543        sim_clock.move_time_forward(timeout_duration);
1544        let now_after_first_timeout = sim_clock.now();
1545        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1546
1547        let mut second_execution_progress = executor
1548            .tick(now_after_first_timeout, RunId::generate())
1549            .await
1550            .unwrap();
1551        assert_eq!(1, second_execution_progress.executions.len());
1552
1553        // Started hanging, wait for lock expiry.
1554        sim_clock.move_time_forward(lock_expiry);
1555        // cleanup should be called
1556        let now_after_second_lock_expiry = sim_clock.now();
1557        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1558        {
1559            let cleanup_progress = executor
1560                .tick(now_after_second_lock_expiry, RunId::generate())
1561                .await
1562                .unwrap();
1563            assert!(cleanup_progress.executions.is_empty());
1564        }
1565        {
1566            let expired_locks = expired_timers_watcher::tick(
1567                db_pool.connection().as_ref(),
1568                now_after_second_lock_expiry,
1569            )
1570            .await
1571            .unwrap()
1572            .expired_locks;
1573            assert_eq!(1, expired_locks);
1574        }
1575        assert!(
1576            !second_execution_progress
1577                .executions
1578                .pop()
1579                .unwrap()
1580                .1
1581                .is_finished()
1582        );
1583
1584        drop(db_connection);
1585        drop(executor);
1586        db_close.close().await;
1587    }
1588}