obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7    DbErrorWrite, DbExecutor, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11    ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16    FinishedExecutionError,
17    storage::{ExecutionEventInner, Version},
18};
19use std::{
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31    pub lock_expiry: Duration,
32    pub tick_sleep: Duration,
33    pub batch_size: u32,
34    pub component_id: ComponentId,
35    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36    pub executor_id: ExecutorId,
37    pub retry_config: ComponentRetryConfig,
38}
39
40pub struct ExecTask<C: ClockFn> {
41    worker: Arc<dyn Worker>,
42    config: ExecConfig,
43    clock_fn: C, // Used for obtaining current time when the execution finishes.
44    db_exec: Arc<dyn DbExecutor>,
45    ffqns: Arc<[FunctionFqn]>,
46}
47
48#[derive(derive_more::Debug, Default)]
49pub struct ExecutionProgress {
50    #[debug(skip)]
51    #[allow(dead_code)]
52    executions: Vec<(ExecutionId, JoinHandle<()>)>,
53}
54
55impl ExecutionProgress {
56    #[cfg(feature = "test")]
57    pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
58        let mut vec = Vec::new();
59        for (exe, join_handle) in self.executions {
60            vec.push(exe);
61            join_handle.await.unwrap();
62        }
63        vec
64    }
65}
66
67#[derive(derive_more::Debug)]
68pub struct ExecutorTaskHandle {
69    #[debug(skip)]
70    is_closing: Arc<AtomicBool>,
71    #[debug(skip)]
72    abort_handle: AbortHandle,
73    component_id: ComponentId,
74    executor_id: ExecutorId,
75}
76
77impl ExecutorTaskHandle {
78    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
79    pub async fn close(&self) {
80        trace!("Gracefully closing");
81        // TODO: Close all the workers tasks as well.
82        // Simple solution:
83        // Attempt to unlock all in-progress executions. Activities will not be penalized for shutdown, although the number of retries would increase,
84        // some progress will be lost.
85        // More complex solution:
86        // Activities will be awaited, up to some deadline, then unlocked.
87        // Workflows that are awating should be signaled using `DeadlineTracker`.
88        self.is_closing.store(true, Ordering::Relaxed);
89        while !self.abort_handle.is_finished() {
90            tokio::time::sleep(Duration::from_millis(1)).await;
91        }
92        debug!("Gracefully closed");
93    }
94}
95
96impl Drop for ExecutorTaskHandle {
97    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
98    fn drop(&mut self) {
99        if self.abort_handle.is_finished() {
100            return;
101        }
102        warn!("Aborting the executor task");
103        self.abort_handle.abort();
104    }
105}
106
107#[cfg(feature = "test")]
108pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
109    extract_exported_ffqns_noext(worker)
110}
111
112fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
113    worker
114        .exported_functions()
115        .iter()
116        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
117        .collect::<Arc<_>>()
118}
119
120impl<C: ClockFn + 'static> ExecTask<C> {
121    #[cfg(feature = "test")]
122    pub fn new_test(
123        worker: Arc<dyn Worker>,
124        config: ExecConfig,
125        clock_fn: C,
126        db_exec: Arc<dyn DbExecutor>,
127        ffqns: Arc<[FunctionFqn]>,
128    ) -> Self {
129        Self {
130            worker,
131            config,
132            clock_fn,
133            db_exec,
134            ffqns,
135        }
136    }
137
138    #[cfg(feature = "test")]
139    pub fn new_all_ffqns_test(
140        worker: Arc<dyn Worker>,
141        config: ExecConfig,
142        clock_fn: C,
143        db_exec: Arc<dyn DbExecutor>,
144    ) -> Self {
145        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
146        Self {
147            worker,
148            config,
149            clock_fn,
150            db_exec,
151            ffqns,
152        }
153    }
154
155    pub fn spawn_new(
156        worker: Arc<dyn Worker>,
157        config: ExecConfig,
158        clock_fn: C,
159        db_exec: Arc<dyn DbExecutor>,
160        sleep: impl Sleep + 'static,
161    ) -> ExecutorTaskHandle {
162        let is_closing = Arc::new(AtomicBool::default());
163        let is_closing_inner = is_closing.clone();
164        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
165        let component_id = config.component_id.clone();
166        let executor_id = config.executor_id;
167        let abort_handle = tokio::spawn(async move {
168            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
169            let task = ExecTask {
170                worker,
171                config,
172                db_exec,
173                ffqns: ffqns.clone(),
174                clock_fn: clock_fn.clone(),
175            };
176            while !is_closing_inner.load(Ordering::Relaxed) {
177                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
178
179                task.db_exec
180                    .wait_for_pending(clock_fn.now(), ffqns.clone(), {
181                        let sleep = sleep.clone();
182                        Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
183                    .await;
184            }
185        })
186        .abort_handle();
187        ExecutorTaskHandle {
188            is_closing,
189            abort_handle,
190            component_id,
191            executor_id,
192        }
193    }
194
195    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
196        if let Some(task_limiter) = &self.config.task_limiter {
197            let mut locks = Vec::new();
198            for _ in 0..self.config.batch_size {
199                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
200                    locks.push(Some(permit));
201                } else {
202                    break;
203                }
204            }
205            locks
206        } else {
207            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
208            for _ in 0..self.config.batch_size {
209                vec.push(None);
210            }
211            vec
212        }
213    }
214
215    #[cfg(feature = "test")]
216    pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
217        self.tick(executed_at, run_id).await.unwrap()
218    }
219
220    #[cfg(feature = "test")]
221    pub async fn tick_test_await(
222        &self,
223        executed_at: DateTime<Utc>,
224        run_id: RunId,
225    ) -> Vec<ExecutionId> {
226        self.tick(executed_at, run_id)
227            .await
228            .unwrap()
229            .wait_for_tasks()
230            .await
231    }
232
233    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
234    async fn tick(
235        &self,
236        executed_at: DateTime<Utc>,
237        run_id: RunId,
238    ) -> Result<ExecutionProgress, DbErrorGeneric> {
239        let locked_executions = {
240            let mut permits = self.acquire_task_permits();
241            if permits.is_empty() {
242                return Ok(ExecutionProgress::default());
243            }
244            let lock_expires_at = executed_at + self.config.lock_expiry;
245            let locked_executions = self
246                .db_exec
247                .lock_pending(
248                    permits.len(), // batch size
249                    executed_at,   // fetch expiring before now
250                    self.ffqns.clone(),
251                    executed_at, // created at
252                    self.config.component_id.clone(),
253                    self.config.executor_id,
254                    lock_expires_at,
255                    run_id,
256                    self.config.retry_config,
257                )
258                .await?;
259            // Drop permits if too many were allocated.
260            while permits.len() > locked_executions.len() {
261                permits.pop();
262            }
263            assert_eq!(permits.len(), locked_executions.len());
264            locked_executions.into_iter().zip(permits)
265        };
266
267        let mut executions = Vec::with_capacity(locked_executions.len());
268        for (locked_execution, permit) in locked_executions {
269            let execution_id = locked_execution.execution_id.clone();
270            let join_handle = {
271                let worker = self.worker.clone();
272                let db = self.db_exec.clone();
273                let clock_fn = self.clock_fn.clone();
274                let worker_span = info_span!(parent: None, "worker",
275                    "otel.name" = format!("worker {}", locked_execution.ffqn),
276                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
277                locked_execution.metadata.enrich(&worker_span);
278                tokio::spawn({
279                    let worker_span2 = worker_span.clone();
280                    let retry_config = self.config.retry_config;
281                    async move {
282                        let _permit = permit;
283                        let res = Self::run_worker(
284                            worker,
285                            db.as_ref(),
286                            clock_fn,
287                            locked_execution,
288                            retry_config,
289                            worker_span2,
290                        )
291                        .await;
292                        if let Err(db_error) = res {
293                            error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
294                        }
295                    }
296                    .instrument(worker_span)
297                })
298            };
299            executions.push((execution_id, join_handle));
300        }
301        Ok(ExecutionProgress { executions })
302    }
303
304    async fn run_worker(
305        worker: Arc<dyn Worker>,
306        db_exec: &dyn DbExecutor,
307        clock_fn: C,
308        locked_execution: LockedExecution,
309        retry_config: ComponentRetryConfig,
310        worker_span: Span,
311    ) -> Result<(), DbErrorWrite> {
312        debug!("Worker::run starting");
313        trace!(
314            version = %locked_execution.next_version,
315            params = ?locked_execution.params,
316            event_history = ?locked_execution.event_history,
317            "Worker::run starting"
318        );
319        let can_be_retried = ExecutionLog::can_be_retried_after(
320            locked_execution.intermittent_event_count + 1,
321            retry_config.max_retries,
322            retry_config.retry_exp_backoff,
323        );
324        let unlock_expiry_on_limit_reached =
325            ExecutionLog::compute_retry_duration_when_retrying_forever(
326                locked_execution.intermittent_event_count + 1,
327                retry_config.retry_exp_backoff,
328            );
329        let ctx = WorkerContext {
330            execution_id: locked_execution.execution_id.clone(),
331            metadata: locked_execution.metadata,
332            ffqn: locked_execution.ffqn,
333            params: locked_execution.params,
334            event_history: locked_execution.event_history,
335            responses: locked_execution
336                .responses
337                .into_iter()
338                .map(|outer| outer.event)
339                .collect(),
340            version: locked_execution.next_version,
341            can_be_retried: can_be_retried.is_some(),
342            locked_event: locked_execution.locked_event,
343            worker_span,
344        };
345        let worker_result = worker.run(ctx).await;
346        trace!(?worker_result, "Worker::run finished");
347        let result_obtained_at = clock_fn.now();
348        match Self::worker_result_to_execution_event(
349            locked_execution.execution_id,
350            worker_result,
351            result_obtained_at,
352            locked_execution.parent,
353            can_be_retried,
354            unlock_expiry_on_limit_reached,
355        )? {
356            Some(append) => {
357                trace!("Appending {append:?}");
358                append.append(db_exec).await
359            }
360            None => Ok(()),
361        }
362    }
363
364    /// Map the `WorkerError` to an temporary or a permanent failure.
365    fn worker_result_to_execution_event(
366        execution_id: ExecutionId,
367        worker_result: WorkerResult,
368        result_obtained_at: DateTime<Utc>,
369        parent: Option<(ExecutionId, JoinSetId)>,
370        can_be_retried: Option<Duration>,
371        unlock_expiry_on_limit_reached: Duration,
372    ) -> Result<Option<Append>, DbErrorWrite> {
373        Ok(match worker_result {
374            WorkerResult::Ok(result, new_version, http_client_traces) => {
375                info!("Execution finished: {result}");
376                let child_finished =
377                    parent.map(
378                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
379                            parent_execution_id,
380                            parent_join_set,
381                            result: result.clone(),
382                        },
383                    );
384                let primary_event = AppendRequest {
385                    created_at: result_obtained_at,
386                    event: ExecutionEventInner::Finished {
387                        result,
388                        http_client_traces,
389                    },
390                };
391
392                Some(Append {
393                    created_at: result_obtained_at,
394                    primary_event,
395                    execution_id,
396                    version: new_version,
397                    child_finished,
398                })
399            }
400            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
401            WorkerResult::Err(err) => {
402                let reason_generic = err.to_string(); // Override with err's reason if no information is lost.
403
404                let (primary_event, child_finished, version) = match err {
405                    WorkerError::TemporaryTimeout {
406                        http_client_traces,
407                        version,
408                    } => {
409                        if let Some(duration) = can_be_retried {
410                            let backoff_expires_at = result_obtained_at + duration;
411                            info!(
412                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
413                            );
414                            (
415                                ExecutionEventInner::TemporarilyTimedOut {
416                                    backoff_expires_at,
417                                    http_client_traces,
418                                },
419                                None,
420                                version,
421                            )
422                        } else {
423                            info!("Execution timed out");
424                            let result = SupportedFunctionReturnValue::ExecutionError(
425                                FinishedExecutionError {
426                                    kind: ExecutionFailureKind::TimedOut,
427                                    reason: None,
428                                    detail: None,
429                                },
430                            );
431                            let child_finished =
432                                parent.map(|(parent_execution_id, parent_join_set)| {
433                                    ChildFinishedResponse {
434                                        parent_execution_id,
435                                        parent_join_set,
436                                        result: result.clone(),
437                                    }
438                                });
439                            (
440                                ExecutionEventInner::Finished {
441                                    result,
442                                    http_client_traces,
443                                },
444                                child_finished,
445                                version,
446                            )
447                        }
448                    }
449                    WorkerError::DbError(db_error) => {
450                        return Err(db_error);
451                    }
452                    WorkerError::ActivityTrap {
453                        reason: _, // reason_generic contains trap_kind + reason
454                        trap_kind,
455                        detail,
456                        version,
457                        http_client_traces,
458                    } => {
459                        if let Some(duration) = can_be_retried {
460                            let expires_at = result_obtained_at + duration;
461                            debug!(
462                                "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
463                            );
464                            (
465                                ExecutionEventInner::TemporarilyFailed {
466                                    reason: StrVariant::from(reason_generic),
467                                    backoff_expires_at: expires_at,
468                                    detail,
469                                    http_client_traces,
470                                },
471                                None,
472                                version,
473                            )
474                        } else {
475                            info!(
476                                "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
477                            );
478                            let result = SupportedFunctionReturnValue::ExecutionError(
479                                FinishedExecutionError {
480                                    reason: Some(reason_generic),
481                                    kind: ExecutionFailureKind::Uncategorized,
482                                    detail,
483                                },
484                            );
485                            let child_finished =
486                                parent.map(|(parent_execution_id, parent_join_set)| {
487                                    ChildFinishedResponse {
488                                        parent_execution_id,
489                                        parent_join_set,
490                                        result: result.clone(),
491                                    }
492                                });
493                            (
494                                ExecutionEventInner::Finished {
495                                    result,
496                                    http_client_traces,
497                                },
498                                child_finished,
499                                version,
500                            )
501                        }
502                    }
503                    WorkerError::ActivityPreopenedDirError {
504                        reason,
505                        detail,
506                        version,
507                    } => {
508                        let http_client_traces = None;
509                        if let Some(duration) = can_be_retried {
510                            let expires_at = result_obtained_at + duration;
511                            debug!(
512                                "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
513                            );
514                            (
515                                ExecutionEventInner::TemporarilyFailed {
516                                    reason: StrVariant::from(reason_generic),
517                                    backoff_expires_at: expires_at,
518                                    detail: Some(detail),
519                                    http_client_traces,
520                                },
521                                None,
522                                version,
523                            )
524                        } else {
525                            info!(
526                                "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
527                            );
528                            let result = SupportedFunctionReturnValue::ExecutionError(
529                                FinishedExecutionError {
530                                    reason: Some(reason_generic),
531                                    kind: ExecutionFailureKind::Uncategorized,
532                                    detail: Some(detail),
533                                },
534                            );
535                            let child_finished =
536                                parent.map(|(parent_execution_id, parent_join_set)| {
537                                    ChildFinishedResponse {
538                                        parent_execution_id,
539                                        parent_join_set,
540                                        result: result.clone(),
541                                    }
542                                });
543                            (
544                                ExecutionEventInner::Finished {
545                                    result,
546                                    http_client_traces,
547                                },
548                                child_finished,
549                                version,
550                            )
551                        }
552                    }
553                    WorkerError::ActivityReturnedError {
554                        detail,
555                        version,
556                        http_client_traces,
557                    } => {
558                        let duration = can_be_retried.expect(
559                            "ActivityReturnedError must not be returned when retries are exhausted",
560                        );
561                        let expires_at = result_obtained_at + duration;
562                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
563                        (
564                            ExecutionEventInner::TemporarilyFailed {
565                                backoff_expires_at: expires_at,
566                                reason: StrVariant::Static("activity returned error"), // is same as the variant's display message.
567                                detail, // contains the backtrace
568                                http_client_traces,
569                            },
570                            None,
571                            version,
572                        )
573                    }
574                    WorkerError::LimitReached {
575                        reason,
576                        version: new_version,
577                    } => {
578                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
579                        warn!(
580                            "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
581                        );
582                        (
583                            ExecutionEventInner::Unlocked {
584                                backoff_expires_at: expires_at,
585                                reason: StrVariant::from(reason),
586                            },
587                            None,
588                            new_version,
589                        )
590                    }
591                    WorkerError::FatalError(fatal_error, version) => {
592                        warn!("Fatal worker error - {fatal_error:?}");
593                        let result = SupportedFunctionReturnValue::ExecutionError(
594                            FinishedExecutionError::from(fatal_error),
595                        );
596                        let child_finished =
597                            parent.map(|(parent_execution_id, parent_join_set)| {
598                                ChildFinishedResponse {
599                                    parent_execution_id,
600                                    parent_join_set,
601                                    result: result.clone(),
602                                }
603                            });
604                        (
605                            ExecutionEventInner::Finished {
606                                result,
607                                http_client_traces: None,
608                            },
609                            child_finished,
610                            version,
611                        )
612                    }
613                };
614                Some(Append {
615                    created_at: result_obtained_at,
616                    primary_event: AppendRequest {
617                        created_at: result_obtained_at,
618                        event: primary_event,
619                    },
620                    execution_id,
621                    version,
622                    child_finished,
623                })
624            }
625        })
626    }
627}
628
629#[derive(Debug, Clone)]
630pub(crate) struct ChildFinishedResponse {
631    pub(crate) parent_execution_id: ExecutionId,
632    pub(crate) parent_join_set: JoinSetId,
633    pub(crate) result: SupportedFunctionReturnValue,
634}
635
636#[derive(Debug, Clone)]
637pub(crate) struct Append {
638    pub(crate) created_at: DateTime<Utc>,
639    pub(crate) primary_event: AppendRequest,
640    pub(crate) execution_id: ExecutionId,
641    pub(crate) version: Version,
642    pub(crate) child_finished: Option<ChildFinishedResponse>,
643}
644
645impl Append {
646    pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
647        if let Some(child_finished) = self.child_finished {
648            assert_matches!(
649                &self.primary_event,
650                AppendRequest {
651                    event: ExecutionEventInner::Finished { .. },
652                    ..
653                }
654            );
655            let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
656            let events = AppendEventsToExecution {
657                execution_id: self.execution_id,
658                version: self.version.clone(),
659                batch: vec![self.primary_event],
660            };
661            let response = AppendResponseToExecution {
662                parent_execution_id: child_finished.parent_execution_id,
663                created_at: self.created_at,
664                join_set_id: child_finished.parent_join_set,
665                child_execution_id,
666                finished_version: self.version, // Since self.primary_event is a finished event, the version will remain the same.
667                result: child_finished.result,
668            };
669
670            db_exec
671                .append_batch_respond_to_parent(events, response, self.created_at)
672                .await?;
673        } else {
674            db_exec
675                .append(self.execution_id, self.version, self.primary_event)
676                .await?;
677        }
678        Ok(())
679    }
680}
681
682#[cfg(any(test, feature = "test"))]
683pub mod simple_worker {
684    use crate::worker::{Worker, WorkerContext, WorkerResult};
685    use async_trait::async_trait;
686    use concepts::{
687        FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
688        storage::{HistoryEvent, Version},
689    };
690    use indexmap::IndexMap;
691    use std::sync::Arc;
692    use tracing::trace;
693
694    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
695    pub type SimpleWorkerResultMap =
696        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
697
698    #[derive(Clone, Debug)]
699    pub struct SimpleWorker {
700        pub worker_results_rev: SimpleWorkerResultMap,
701        pub ffqn: FunctionFqn,
702        exported: [FunctionMetadata; 1],
703    }
704
705    impl SimpleWorker {
706        #[must_use]
707        pub fn with_single_result(res: WorkerResult) -> Self {
708            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
709                Version::new(2),
710                (vec![], res),
711            )]))))
712        }
713
714        #[must_use]
715        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
716            Self {
717                worker_results_rev: self.worker_results_rev,
718                exported: [FunctionMetadata {
719                    ffqn: ffqn.clone(),
720                    parameter_types: ParameterTypes::default(),
721                    return_type: RETURN_TYPE_DUMMY,
722                    extension: None,
723                    submittable: true,
724                }],
725                ffqn,
726            }
727        }
728
729        #[must_use]
730        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
731            Self {
732                worker_results_rev,
733                ffqn: FFQN_SOME,
734                exported: [FunctionMetadata {
735                    ffqn: FFQN_SOME,
736                    parameter_types: ParameterTypes::default(),
737                    return_type: RETURN_TYPE_DUMMY,
738                    extension: None,
739                    submittable: true,
740                }],
741            }
742        }
743    }
744
745    #[async_trait]
746    impl Worker for SimpleWorker {
747        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
748            let (expected_version, (expected_eh, worker_result)) =
749                self.worker_results_rev.lock().unwrap().pop().unwrap();
750            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
751            assert_eq!(expected_version, ctx.version);
752            assert_eq!(
753                expected_eh,
754                ctx.event_history
755                    .iter()
756                    .map(|(event, _version)| event.clone())
757                    .collect::<Vec<_>>()
758            );
759            worker_result
760        }
761
762        fn exported_functions(&self) -> &[FunctionMetadata] {
763            &self.exported
764        }
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use self::simple_worker::SimpleWorker;
771    use super::*;
772    use crate::{expired_timers_watcher, worker::WorkerResult};
773    use assert_matches::assert_matches;
774    use async_trait::async_trait;
775    use concepts::storage::{
776        CreateRequest, DbConnection, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
777    };
778    use concepts::storage::{DbPoolCloseable, LockedBy};
779    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
780    use concepts::time::Now;
781    use concepts::{
782        FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
783        SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
784    };
785    use db_tests::Database;
786    use indexmap::IndexMap;
787    use simple_worker::FFQN_SOME;
788    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
789    use test_utils::set_up;
790    use test_utils::sim_clock::{ConstClock, SimClock};
791
792    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
793
794    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
795        config: ExecConfig,
796        clock_fn: C,
797        db_exec: Arc<dyn DbExecutor>,
798        worker: Arc<W>,
799        executed_at: DateTime<Utc>,
800    ) -> Vec<ExecutionId> {
801        trace!("Ticking with {worker:?}");
802        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
803        let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
804        executor
805            .tick_test_await(executed_at, RunId::generate())
806            .await
807    }
808
809    #[tokio::test]
810    async fn execute_simple_lifecycle_tick_based_mem() {
811        let created_at = Now.now();
812        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
813        execute_simple_lifecycle_tick_based(
814            db_pool.connection().as_ref(),
815            db_exec.clone(),
816            ConstClock(created_at),
817        )
818        .await;
819        db_close.close().await;
820    }
821
822    #[tokio::test]
823    async fn execute_simple_lifecycle_tick_based_sqlite() {
824        let created_at = Now.now();
825        let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
826        execute_simple_lifecycle_tick_based(
827            db_pool.connection().as_ref(),
828            db_exec.clone(),
829            ConstClock(created_at),
830        )
831        .await;
832        db_close.close().await;
833    }
834
835    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
836        db_connection: &dyn DbConnection,
837        db_exec: Arc<dyn DbExecutor>,
838        clock_fn: C,
839    ) {
840        set_up();
841        let created_at = clock_fn.now();
842        let exec_config = ExecConfig {
843            batch_size: 1,
844            lock_expiry: Duration::from_secs(1),
845            tick_sleep: Duration::from_millis(100),
846            component_id: ComponentId::dummy_activity(),
847            task_limiter: None,
848            executor_id: ExecutorId::generate(),
849            retry_config: ComponentRetryConfig::ZERO,
850        };
851
852        let execution_log = create_and_tick(
853            CreateAndTickConfig {
854                execution_id: ExecutionId::generate(),
855                created_at,
856                executed_at: created_at,
857            },
858            clock_fn,
859            db_connection,
860            db_exec,
861            exec_config,
862            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
863                SUPPORTED_RETURN_VALUE_OK_EMPTY,
864                Version::new(2),
865                None,
866            ))),
867            tick_fn,
868        )
869        .await;
870        assert_matches!(
871            execution_log.events.get(2).unwrap(),
872            ExecutionEvent {
873                event: ExecutionEventInner::Finished {
874                    result: SupportedFunctionReturnValue::Ok { ok: None },
875                    http_client_traces: None
876                },
877                created_at: _,
878                backtrace_id: None,
879                version: Version(2),
880            }
881        );
882    }
883
884    #[tokio::test]
885    async fn execute_simple_lifecycle_task_based_mem() {
886        set_up();
887        let created_at = Now.now();
888        let clock_fn = ConstClock(created_at);
889        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
890        let exec_config = ExecConfig {
891            batch_size: 1,
892            lock_expiry: Duration::from_secs(1),
893            tick_sleep: Duration::ZERO,
894            component_id: ComponentId::dummy_activity(),
895            task_limiter: None,
896            executor_id: ExecutorId::generate(),
897            retry_config: ComponentRetryConfig::ZERO,
898        };
899
900        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
901            SUPPORTED_RETURN_VALUE_OK_EMPTY,
902            Version::new(2),
903            None,
904        )));
905
906        let execution_log = create_and_tick(
907            CreateAndTickConfig {
908                execution_id: ExecutionId::generate(),
909                created_at,
910                executed_at: created_at,
911            },
912            clock_fn,
913            db_pool.connection().as_ref(),
914            db_exec,
915            exec_config,
916            worker,
917            tick_fn,
918        )
919        .await;
920        assert_matches!(
921            execution_log.events.get(2).unwrap(),
922            ExecutionEvent {
923                event: ExecutionEventInner::Finished {
924                    result: SupportedFunctionReturnValue::Ok { ok: None },
925                    http_client_traces: None
926                },
927                created_at: _,
928                backtrace_id: None,
929                version: Version(2),
930            }
931        );
932        db_close.close().await;
933    }
934
935    struct CreateAndTickConfig {
936        execution_id: ExecutionId,
937        created_at: DateTime<Utc>,
938        executed_at: DateTime<Utc>,
939    }
940
941    async fn create_and_tick<
942        W: Worker,
943        C: ClockFn,
944        T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
945        F: Future<Output = Vec<ExecutionId>>,
946    >(
947        config: CreateAndTickConfig,
948        clock_fn: C,
949        db_connection: &dyn DbConnection,
950        db_exec: Arc<dyn DbExecutor>,
951        exec_config: ExecConfig,
952        worker: Arc<W>,
953        mut tick: T,
954    ) -> ExecutionLog {
955        // Create an execution
956        db_connection
957            .create(CreateRequest {
958                created_at: config.created_at,
959                execution_id: config.execution_id.clone(),
960                ffqn: FFQN_SOME,
961                params: Params::empty(),
962                parent: None,
963                metadata: concepts::ExecutionMetadata::empty(),
964                scheduled_at: config.created_at,
965                component_id: ComponentId::dummy_activity(),
966                scheduled_by: None,
967            })
968            .await
969            .unwrap();
970        // execute!
971        tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
972        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
973        debug!("Execution history after tick: {execution_log:?}");
974        // check that DB contains Created and Locked events.
975        assert_matches!(
976            execution_log.events.first().unwrap(),
977            ExecutionEvent {
978                event: ExecutionEventInner::Created { .. },
979                created_at: actually_created_at,
980                backtrace_id: None,
981                version: Version(0),
982            }
983            if config.created_at == *actually_created_at
984        );
985        let locked_at = assert_matches!(
986            execution_log.events.get(1).unwrap(),
987            ExecutionEvent {
988                event: ExecutionEventInner::Locked { .. },
989                created_at: locked_at,
990                backtrace_id: None,
991                version: Version(1),
992            } if config.created_at <= *locked_at
993            => *locked_at
994        );
995        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
996            event: _,
997            created_at: executed_at,
998            backtrace_id: None,
999            version: Version(2),
1000        } if *executed_at >= locked_at);
1001        execution_log
1002    }
1003
1004    #[tokio::test]
1005    async fn activity_trap_should_trigger_an_execution_retry() {
1006        set_up();
1007        let sim_clock = SimClock::default();
1008        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1009        let retry_exp_backoff = Duration::from_millis(100);
1010        let retry_config = ComponentRetryConfig {
1011            max_retries: 1,
1012            retry_exp_backoff,
1013        };
1014        let exec_config = ExecConfig {
1015            batch_size: 1,
1016            lock_expiry: Duration::from_secs(1),
1017            tick_sleep: Duration::ZERO,
1018            component_id: ComponentId::dummy_activity(),
1019            task_limiter: None,
1020            executor_id: ExecutorId::generate(),
1021            retry_config,
1022        };
1023        let expected_reason = "error reason";
1024        let expected_detail = "error detail";
1025        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1026            WorkerError::ActivityTrap {
1027                reason: expected_reason.to_string(),
1028                trap_kind: concepts::TrapKind::Trap,
1029                detail: Some(expected_detail.to_string()),
1030                version: Version::new(2),
1031                http_client_traces: None,
1032            },
1033        )));
1034        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1035        let execution_log = create_and_tick(
1036            CreateAndTickConfig {
1037                execution_id: ExecutionId::generate(),
1038                created_at: sim_clock.now(),
1039                executed_at: sim_clock.now(),
1040            },
1041            sim_clock.clone(),
1042            db_pool.connection().as_ref(),
1043            db_exec.clone(),
1044            exec_config.clone(),
1045            worker,
1046            tick_fn,
1047        )
1048        .await;
1049        assert_eq!(3, execution_log.events.len());
1050        {
1051            let (reason, detail, at, expires_at) = assert_matches!(
1052                &execution_log.events.get(2).unwrap(),
1053                ExecutionEvent {
1054                    event: ExecutionEventInner::TemporarilyFailed {
1055                        reason,
1056                        detail,
1057                        backoff_expires_at,
1058                        http_client_traces: None,
1059                    },
1060                    created_at: at,
1061                    backtrace_id: None,
1062                    version: Version(2),
1063                }
1064                => (reason, detail, *at, *backoff_expires_at)
1065            );
1066            assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1067            assert_eq!(Some(expected_detail), detail.as_deref());
1068            assert_eq!(at, sim_clock.now());
1069            assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1070        }
1071        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1072            std::sync::Mutex::new(IndexMap::from([(
1073                Version::new(4),
1074                (
1075                    vec![],
1076                    WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1077                ),
1078            )])),
1079        )));
1080        // noop until `retry_exp_backoff` expires
1081        assert!(
1082            tick_fn(
1083                exec_config.clone(),
1084                sim_clock.clone(),
1085                db_exec.clone(),
1086                worker.clone(),
1087                sim_clock.now(),
1088            )
1089            .await
1090            .is_empty()
1091        );
1092        // tick again to finish the execution
1093        sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1094        tick_fn(
1095            exec_config,
1096            sim_clock.clone(),
1097            db_exec.clone(),
1098            worker,
1099            sim_clock.now(),
1100        )
1101        .await;
1102        let execution_log = {
1103            let db_connection = db_pool.connection();
1104            db_connection
1105                .get(&execution_log.execution_id)
1106                .await
1107                .unwrap()
1108        };
1109        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1110        assert_matches!(
1111            execution_log.events.get(3).unwrap(),
1112            ExecutionEvent {
1113                event: ExecutionEventInner::Locked { .. },
1114                created_at: at,
1115                backtrace_id: None,
1116                version: Version(3),
1117            } if *at == sim_clock.now()
1118        );
1119        assert_matches!(
1120            execution_log.events.get(4).unwrap(),
1121            ExecutionEvent {
1122                event: ExecutionEventInner::Finished {
1123                    result: SupportedFunctionReturnValue::Ok{ok:None},
1124                    http_client_traces: None
1125                },
1126                created_at: finished_at,
1127                backtrace_id: None,
1128                version: Version(4),
1129            } if *finished_at == sim_clock.now()
1130        );
1131        db_close.close().await;
1132    }
1133
1134    #[tokio::test]
1135    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1136        set_up();
1137        let created_at = Now.now();
1138        let clock_fn = ConstClock(created_at);
1139        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1140        let exec_config = ExecConfig {
1141            batch_size: 1,
1142            lock_expiry: Duration::from_secs(1),
1143            tick_sleep: Duration::ZERO,
1144            component_id: ComponentId::dummy_activity(),
1145            task_limiter: None,
1146            executor_id: ExecutorId::generate(),
1147            retry_config: ComponentRetryConfig::ZERO,
1148        };
1149
1150        let reason = "error reason";
1151        let expected_reason = format!("activity trap: {reason}");
1152        let expected_detail = "error detail";
1153        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1154            WorkerError::ActivityTrap {
1155                reason: reason.to_string(),
1156                trap_kind: concepts::TrapKind::Trap,
1157                detail: Some(expected_detail.to_string()),
1158                version: Version::new(2),
1159                http_client_traces: None,
1160            },
1161        )));
1162        let execution_log = create_and_tick(
1163            CreateAndTickConfig {
1164                execution_id: ExecutionId::generate(),
1165                created_at,
1166                executed_at: created_at,
1167            },
1168            clock_fn,
1169            db_pool.connection().as_ref(),
1170            db_exec,
1171            exec_config.clone(),
1172            worker,
1173            tick_fn,
1174        )
1175        .await;
1176        assert_eq!(3, execution_log.events.len());
1177        let (reason, kind, detail) = assert_matches!(
1178            &execution_log.events.get(2).unwrap(),
1179            ExecutionEvent {
1180                event: ExecutionEventInner::Finished{
1181                    result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1182                    http_client_traces: None
1183                },
1184                created_at: at,
1185                backtrace_id: None,
1186                version: Version(2),
1187            } if *at == created_at
1188            => (reason, kind, detail)
1189        );
1190
1191        assert_eq!(Some(expected_reason), *reason);
1192        assert_eq!(Some(expected_detail), detail.as_deref());
1193        assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1194
1195        db_close.close().await;
1196    }
1197
1198    #[tokio::test]
1199    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1200        let worker_error = WorkerError::ActivityTrap {
1201            reason: "error reason".to_string(),
1202            trap_kind: TrapKind::Trap,
1203            detail: Some("detail".to_string()),
1204            version: Version::new(2),
1205            http_client_traces: None,
1206        };
1207        let expected_child_err = FinishedExecutionError {
1208            kind: ExecutionFailureKind::Uncategorized,
1209            reason: Some("activity trap: error reason".to_string()),
1210            detail: Some("detail".to_string()),
1211        };
1212        child_execution_permanently_failed_should_notify_parent(
1213            WorkerResult::Err(worker_error),
1214            expected_child_err,
1215        )
1216        .await;
1217    }
1218
1219    #[tokio::test]
1220    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1221        let expected_child_err = FinishedExecutionError {
1222            kind: ExecutionFailureKind::TimedOut,
1223            reason: None,
1224            detail: None,
1225        };
1226        child_execution_permanently_failed_should_notify_parent(
1227            WorkerResult::DbUpdatedByWorkerOrWatcher,
1228            expected_child_err,
1229        )
1230        .await;
1231    }
1232
1233    async fn child_execution_permanently_failed_should_notify_parent(
1234        worker_result: WorkerResult,
1235        expected_child_err: FinishedExecutionError,
1236    ) {
1237        use concepts::storage::JoinSetResponseEventOuter;
1238        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1239
1240        set_up();
1241        let sim_clock = SimClock::default();
1242        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1243
1244        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1245            WorkerResult::DbUpdatedByWorkerOrWatcher,
1246        ));
1247        let parent_execution_id = ExecutionId::generate();
1248        db_pool
1249            .connection()
1250            .create(CreateRequest {
1251                created_at: sim_clock.now(),
1252                execution_id: parent_execution_id.clone(),
1253                ffqn: FFQN_SOME,
1254                params: Params::empty(),
1255                parent: None,
1256                metadata: concepts::ExecutionMetadata::empty(),
1257                scheduled_at: sim_clock.now(),
1258                component_id: ComponentId::dummy_activity(),
1259                scheduled_by: None,
1260            })
1261            .await
1262            .unwrap();
1263        let parent_executor_id = ExecutorId::generate();
1264        tick_fn(
1265            ExecConfig {
1266                batch_size: 1,
1267                lock_expiry: LOCK_EXPIRY,
1268                tick_sleep: Duration::ZERO,
1269                component_id: ComponentId::dummy_activity(),
1270                task_limiter: None,
1271                executor_id: parent_executor_id,
1272                retry_config: ComponentRetryConfig::ZERO,
1273            },
1274            sim_clock.clone(),
1275            db_exec.clone(),
1276            parent_worker,
1277            sim_clock.now(),
1278        )
1279        .await;
1280
1281        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1282        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1283        // executor does not append anything, this should have been written by the worker:
1284        {
1285            let params = Params::empty();
1286            let child = CreateRequest {
1287                created_at: sim_clock.now(),
1288                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1289                ffqn: FFQN_CHILD,
1290                params: params.clone(),
1291                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1292                metadata: concepts::ExecutionMetadata::empty(),
1293                scheduled_at: sim_clock.now(),
1294                component_id: ComponentId::dummy_activity(),
1295                scheduled_by: None,
1296            };
1297            let current_time = sim_clock.now();
1298            let join_set = AppendRequest {
1299                created_at: current_time,
1300                event: ExecutionEventInner::HistoryEvent {
1301                    event: HistoryEvent::JoinSetCreate {
1302                        join_set_id: join_set_id.clone(),
1303                    },
1304                },
1305            };
1306            let child_exec_req = AppendRequest {
1307                created_at: current_time,
1308                event: ExecutionEventInner::HistoryEvent {
1309                    event: HistoryEvent::JoinSetRequest {
1310                        join_set_id: join_set_id.clone(),
1311                        request: JoinSetRequest::ChildExecutionRequest {
1312                            child_execution_id: child_execution_id.clone(),
1313                            target_ffqn: FFQN_CHILD,
1314                            params,
1315                        },
1316                    },
1317                },
1318            };
1319            let join_next = AppendRequest {
1320                created_at: current_time,
1321                event: ExecutionEventInner::HistoryEvent {
1322                    event: HistoryEvent::JoinNext {
1323                        join_set_id: join_set_id.clone(),
1324                        run_expires_at: sim_clock.now(),
1325                        closing: false,
1326                        requested_ffqn: Some(FFQN_CHILD),
1327                    },
1328                },
1329            };
1330            db_pool
1331                .connection()
1332                .append_batch_create_new_execution(
1333                    current_time,
1334                    vec![join_set, child_exec_req, join_next],
1335                    parent_execution_id.clone(),
1336                    Version::new(2),
1337                    vec![child],
1338                )
1339                .await
1340                .unwrap();
1341        }
1342
1343        let child_worker =
1344            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1345
1346        // execute the child
1347        tick_fn(
1348            ExecConfig {
1349                batch_size: 1,
1350                lock_expiry: LOCK_EXPIRY,
1351                tick_sleep: Duration::ZERO,
1352                component_id: ComponentId::dummy_activity(),
1353                task_limiter: None,
1354                executor_id: ExecutorId::generate(),
1355                retry_config: ComponentRetryConfig::ZERO,
1356            },
1357            sim_clock.clone(),
1358            db_exec.clone(),
1359            child_worker,
1360            sim_clock.now(),
1361        )
1362        .await;
1363        if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1364            // In case of timeout, let the timers watcher handle it
1365            sim_clock.move_time_forward(LOCK_EXPIRY);
1366            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1367                .await
1368                .unwrap();
1369        }
1370        let child_log = db_pool
1371            .connection()
1372            .get(&ExecutionId::Derived(child_execution_id.clone()))
1373            .await
1374            .unwrap();
1375        assert!(child_log.pending_state.is_finished());
1376        assert_eq!(
1377            Version(2),
1378            child_log.next_version,
1379            "created = 0, locked = 1, with_single_result = 2"
1380        );
1381        assert_eq!(
1382            ExecutionEventInner::Finished {
1383                result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1384                http_client_traces: None
1385            },
1386            child_log.last_event().event
1387        );
1388        let parent_log = db_pool
1389            .connection()
1390            .get(&parent_execution_id)
1391            .await
1392            .unwrap();
1393        assert_matches!(
1394            parent_log.pending_state,
1395            PendingState::PendingAt {
1396                scheduled_at,
1397                last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1398            } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1399            "parent should be back to pending"
1400        );
1401        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1402            parent_log.responses.last(),
1403            Some(JoinSetResponseEventOuter{
1404                created_at: at,
1405                event: JoinSetResponseEvent{
1406                    join_set_id: found_join_set_id,
1407                    event: JoinSetResponse::ChildExecutionFinished {
1408                        child_execution_id: found_child_execution_id,
1409                        finished_version,
1410                        result: found_result,
1411                    }
1412                }
1413            })
1414             if *at == sim_clock.now()
1415            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1416        );
1417        assert_eq!(join_set_id, *found_join_set_id);
1418        assert_eq!(child_execution_id, *found_child_execution_id);
1419        assert_eq!(child_log.next_version, *child_finished_version);
1420        assert_matches!(
1421            found_result,
1422            SupportedFunctionReturnValue::ExecutionError(_)
1423        );
1424
1425        db_close.close().await;
1426    }
1427
1428    #[derive(Clone, Debug)]
1429    struct SleepyWorker {
1430        duration: Duration,
1431        result: SupportedFunctionReturnValue,
1432        exported: [FunctionMetadata; 1],
1433    }
1434
1435    #[async_trait]
1436    impl Worker for SleepyWorker {
1437        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1438            tokio::time::sleep(self.duration).await;
1439            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1440        }
1441
1442        fn exported_functions(&self) -> &[FunctionMetadata] {
1443            &self.exported
1444        }
1445    }
1446
1447    #[tokio::test]
1448    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1449        set_up();
1450        let sim_clock = SimClock::default();
1451        let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1452        let lock_expiry = Duration::from_millis(100);
1453        let timeout_duration = Duration::from_millis(300);
1454        let retry_config = ComponentRetryConfig {
1455            max_retries: 1,
1456            retry_exp_backoff: timeout_duration,
1457        };
1458        let exec_config = ExecConfig {
1459            batch_size: 1,
1460            lock_expiry,
1461            tick_sleep: Duration::ZERO,
1462            component_id: ComponentId::dummy_activity(),
1463            task_limiter: None,
1464            executor_id: ExecutorId::generate(),
1465            retry_config,
1466        };
1467
1468        let worker = Arc::new(SleepyWorker {
1469            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1470            result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1471            exported: [FunctionMetadata {
1472                ffqn: FFQN_SOME,
1473                parameter_types: ParameterTypes::default(),
1474                return_type: RETURN_TYPE_DUMMY,
1475                extension: None,
1476                submittable: true,
1477            }],
1478        });
1479        // Create an execution
1480        let execution_id = ExecutionId::generate();
1481        let db_connection = db_pool.connection();
1482        db_connection
1483            .create(CreateRequest {
1484                created_at: sim_clock.now(),
1485                execution_id: execution_id.clone(),
1486                ffqn: FFQN_SOME,
1487                params: Params::empty(),
1488                parent: None,
1489                metadata: concepts::ExecutionMetadata::empty(),
1490                scheduled_at: sim_clock.now(),
1491                component_id: ComponentId::dummy_activity(),
1492                scheduled_by: None,
1493            })
1494            .await
1495            .unwrap();
1496
1497        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1498        let executor = ExecTask::new_test(
1499            worker,
1500            exec_config.clone(),
1501            sim_clock.clone(),
1502            db_exec.clone(),
1503            ffqns,
1504        );
1505        let mut first_execution_progress = executor
1506            .tick(sim_clock.now(), RunId::generate())
1507            .await
1508            .unwrap();
1509        assert_eq!(1, first_execution_progress.executions.len());
1510        // Started hanging, wait for lock expiry.
1511        sim_clock.move_time_forward(lock_expiry);
1512        // cleanup should be called
1513        let now_after_first_lock_expiry = sim_clock.now();
1514        {
1515            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1516            let cleanup_progress = executor
1517                .tick(now_after_first_lock_expiry, RunId::generate())
1518                .await
1519                .unwrap();
1520            assert!(cleanup_progress.executions.is_empty());
1521        }
1522        {
1523            let expired_locks = expired_timers_watcher::tick(
1524                db_pool.connection().as_ref(),
1525                now_after_first_lock_expiry,
1526            )
1527            .await
1528            .unwrap()
1529            .expired_locks;
1530            assert_eq!(1, expired_locks);
1531        }
1532        assert!(
1533            !first_execution_progress
1534                .executions
1535                .pop()
1536                .unwrap()
1537                .1
1538                .is_finished()
1539        );
1540
1541        let execution_log = db_connection.get(&execution_id).await.unwrap();
1542        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1543        assert_matches!(
1544            &execution_log.events.get(2).unwrap(),
1545            ExecutionEvent {
1546                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1547                created_at: at,
1548                backtrace_id: None,
1549                version: Version(2),
1550            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1551        );
1552        assert_matches!(
1553            execution_log.pending_state,
1554            PendingState::PendingAt {
1555                scheduled_at: found_scheduled_by,
1556                last_lock: Some(LockedBy {
1557                    executor_id: found_executor_id,
1558                    run_id: _,
1559                }),
1560            } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1561        );
1562        sim_clock.move_time_forward(timeout_duration);
1563        let now_after_first_timeout = sim_clock.now();
1564        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1565
1566        let mut second_execution_progress = executor
1567            .tick(now_after_first_timeout, RunId::generate())
1568            .await
1569            .unwrap();
1570        assert_eq!(1, second_execution_progress.executions.len());
1571
1572        // Started hanging, wait for lock expiry.
1573        sim_clock.move_time_forward(lock_expiry);
1574        // cleanup should be called
1575        let now_after_second_lock_expiry = sim_clock.now();
1576        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1577        {
1578            let cleanup_progress = executor
1579                .tick(now_after_second_lock_expiry, RunId::generate())
1580                .await
1581                .unwrap();
1582            assert!(cleanup_progress.executions.is_empty());
1583        }
1584        {
1585            let expired_locks = expired_timers_watcher::tick(
1586                db_pool.connection().as_ref(),
1587                now_after_second_lock_expiry,
1588            )
1589            .await
1590            .unwrap()
1591            .expired_locks;
1592            assert_eq!(1, expired_locks);
1593        }
1594        assert!(
1595            !second_execution_progress
1596                .executions
1597                .pop()
1598                .unwrap()
1599                .1
1600                .is_finished()
1601        );
1602
1603        drop(db_connection);
1604        drop(executor);
1605        db_close.close().await;
1606    }
1607}