obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7    LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13    FinishedExecutionError,
14    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30    pub lock_expiry: Duration,
31    pub tick_sleep: Duration,
32    pub batch_size: u32,
33    pub component_id: ComponentId,
34    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35    pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39    worker: Arc<dyn Worker>,
40    config: ExecConfig,
41    clock_fn: C, // Used for obtaining current time when the execution finishes.
42    db_pool: Arc<dyn DbPool>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!("Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: Arc<dyn DbPool>,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            clock_fn,
123            db_pool,
124            ffqns,
125        }
126    }
127
128    pub fn spawn_new(
129        worker: Arc<dyn Worker>,
130        config: ExecConfig,
131        clock_fn: C,
132        db_pool: Arc<dyn DbPool>,
133    ) -> ExecutorTaskHandle {
134        let is_closing = Arc::new(AtomicBool::default());
135        let is_closing_inner = is_closing.clone();
136        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137        let component_id = config.component_id.clone();
138        let executor_id = config.executor_id;
139        let abort_handle = tokio::spawn(async move {
140            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141            let task = Self {
142                worker,
143                config,
144                db_pool,
145                ffqns: ffqns.clone(),
146                clock_fn: clock_fn.clone(),
147            };
148            loop {
149                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150                let executed_at = clock_fn.now();
151                task.db_pool
152                    .connection()
153                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154                    .await;
155                if is_closing_inner.load(Ordering::Relaxed) {
156                    return;
157                }
158            }
159        })
160        .abort_handle();
161        ExecutorTaskHandle {
162            is_closing,
163            abort_handle,
164            component_id,
165            executor_id,
166        }
167    }
168
169    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170        if let Some(task_limiter) = &self.config.task_limiter {
171            let mut locks = Vec::new();
172            for _ in 0..self.config.batch_size {
173                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174                    locks.push(Some(permit));
175                } else {
176                    break;
177                }
178            }
179            locks
180        } else {
181            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182            for _ in 0..self.config.batch_size {
183                vec.push(None);
184            }
185            vec
186        }
187    }
188
189    #[cfg(feature = "test")]
190    pub async fn tick_test(
191        &self,
192        executed_at: DateTime<Utc>,
193        run_id: RunId,
194    ) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at, run_id).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199    async fn tick(
200        &self,
201        executed_at: DateTime<Utc>,
202        run_id: RunId,
203    ) -> Result<ExecutionProgress, ()> {
204        let locked_executions = {
205            let mut permits = self.acquire_task_permits();
206            if permits.is_empty() {
207                return Ok(ExecutionProgress::default());
208            }
209            let db_connection = self.db_pool.connection();
210            let lock_expires_at = executed_at + self.config.lock_expiry;
211            let locked_executions = db_connection
212                .lock_pending(
213                    permits.len(), // batch size
214                    executed_at,   // fetch expiring before now
215                    self.ffqns.clone(),
216                    executed_at, // created at
217                    self.config.component_id.clone(),
218                    self.config.executor_id,
219                    lock_expires_at,
220                    run_id,
221                )
222                .await
223                .map_err(|err| {
224                    warn!("lock_pending error {err:?}");
225                })?;
226            // Drop permits if too many were allocated.
227            while permits.len() > locked_executions.len() {
228                permits.pop();
229            }
230            assert_eq!(permits.len(), locked_executions.len());
231            locked_executions.into_iter().zip(permits)
232        };
233        let execution_deadline = executed_at + self.config.lock_expiry;
234
235        let mut executions = Vec::with_capacity(locked_executions.len());
236        for (locked_execution, permit) in locked_executions {
237            let execution_id = locked_execution.execution_id.clone();
238            let join_handle = {
239                let worker = self.worker.clone();
240                let db_pool = self.db_pool.clone();
241                let clock_fn = self.clock_fn.clone();
242                let run_id = locked_execution.run_id;
243                let worker_span = info_span!(parent: None, "worker",
244                    "otel.name" = format!("worker {}", locked_execution.ffqn),
245                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246                locked_execution.metadata.enrich(&worker_span);
247                tokio::spawn({
248                    let worker_span2 = worker_span.clone();
249                    async move {
250                        let res = Self::run_worker(
251                            worker,
252                            db_pool.as_ref(),
253                            execution_deadline,
254                            clock_fn,
255                            locked_execution,
256                            worker_span2,
257                        )
258                        .await;
259                        if let Err(db_error) = res {
260                            error!("Execution will be timed out not writing `{db_error:?}`");
261                        }
262                        drop(permit);
263                    }
264                    .instrument(worker_span)
265                })
266            };
267            executions.push((execution_id, join_handle));
268        }
269        Ok(ExecutionProgress { executions })
270    }
271
272    async fn run_worker(
273        worker: Arc<dyn Worker>,
274        db_pool: &dyn DbPool,
275        execution_deadline: DateTime<Utc>,
276        clock_fn: C,
277        locked_execution: LockedExecution,
278        worker_span: Span,
279    ) -> Result<(), DbError> {
280        debug!("Worker::run starting");
281        trace!(
282            version = %locked_execution.version,
283            params = ?locked_execution.params,
284            event_history = ?locked_execution.event_history,
285            "Worker::run starting"
286        );
287        let can_be_retried = ExecutionLog::can_be_retried_after(
288            locked_execution.temporary_event_count + 1,
289            locked_execution.max_retries,
290            locked_execution.retry_exp_backoff,
291        );
292        let unlock_expiry_on_limit_reached =
293            ExecutionLog::compute_retry_duration_when_retrying_forever(
294                locked_execution.temporary_event_count + 1,
295                locked_execution.retry_exp_backoff,
296            );
297        let ctx = WorkerContext {
298            execution_id: locked_execution.execution_id.clone(),
299            metadata: locked_execution.metadata,
300            ffqn: locked_execution.ffqn,
301            params: locked_execution.params,
302            event_history: locked_execution.event_history,
303            responses: locked_execution
304                .responses
305                .into_iter()
306                .map(|outer| outer.event)
307                .collect(),
308            version: locked_execution.version,
309            execution_deadline,
310            can_be_retried: can_be_retried.is_some(),
311            run_id: locked_execution.run_id,
312            worker_span,
313        };
314        let worker_result = worker.run(ctx).await;
315        trace!(?worker_result, "Worker::run finished");
316        let result_obtained_at = clock_fn.now();
317        match Self::worker_result_to_execution_event(
318            locked_execution.execution_id,
319            worker_result,
320            result_obtained_at,
321            locked_execution.parent,
322            can_be_retried,
323            unlock_expiry_on_limit_reached,
324        )? {
325            Some(append) => {
326                let db_connection = db_pool.connection();
327                trace!("Appending {append:?}");
328                append.append(db_connection.as_ref()).await
329            }
330            None => Ok(()),
331        }
332    }
333
334    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
335    /// Map the `WorkerError` to an temporary or a permanent failure.
336    fn worker_result_to_execution_event(
337        execution_id: ExecutionId,
338        worker_result: WorkerResult,
339        result_obtained_at: DateTime<Utc>,
340        parent: Option<(ExecutionId, JoinSetId)>,
341        can_be_retried: Option<Duration>,
342        unlock_expiry_on_limit_reached: Duration,
343    ) -> Result<Option<Append>, DbError> {
344        Ok(match worker_result {
345            WorkerResult::Ok(result, new_version, http_client_traces) => {
346                info!(
347                    "Execution finished: {}",
348                    result.as_pending_state_finished_result()
349                );
350                let child_finished =
351                    parent.map(
352                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
353                            parent_execution_id,
354                            parent_join_set,
355                            result: Ok(result.clone()),
356                        },
357                    );
358                let primary_event = AppendRequest {
359                    created_at: result_obtained_at,
360                    event: ExecutionEventInner::Finished {
361                        result: Ok(result),
362                        http_client_traces,
363                    },
364                };
365
366                Some(Append {
367                    created_at: result_obtained_at,
368                    primary_event,
369                    execution_id,
370                    version: new_version,
371                    child_finished,
372                })
373            }
374            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
375            WorkerResult::Err(err) => {
376                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
377                let (primary_event, child_finished, version) = match err {
378                    WorkerError::TemporaryTimeout {
379                        http_client_traces,
380                        version,
381                    } => {
382                        if let Some(duration) = can_be_retried {
383                            let backoff_expires_at = result_obtained_at + duration;
384                            info!(
385                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
386                            );
387                            (
388                                ExecutionEventInner::TemporarilyTimedOut {
389                                    backoff_expires_at,
390                                    http_client_traces,
391                                },
392                                None,
393                                version,
394                            )
395                        } else {
396                            info!("Permanent timeout");
397                            let result = Err(FinishedExecutionError::PermanentTimeout);
398                            let child_finished =
399                                parent.map(|(parent_execution_id, parent_join_set)| {
400                                    ChildFinishedResponse {
401                                        parent_execution_id,
402                                        parent_join_set,
403                                        result: result.clone(),
404                                    }
405                                });
406                            (
407                                ExecutionEventInner::Finished {
408                                    result,
409                                    http_client_traces,
410                                },
411                                child_finished,
412                                version,
413                            )
414                        }
415                    }
416                    WorkerError::DbError(db_error) => {
417                        return Err(db_error);
418                    }
419                    WorkerError::ActivityTrap {
420                        reason: reason_inner,
421                        trap_kind,
422                        detail,
423                        version,
424                        http_client_traces,
425                    } => retry_or_fail(
426                        result_obtained_at,
427                        parent,
428                        can_be_retried,
429                        reason_full,
430                        reason_inner,
431                        trap_kind, // short reason like `trap` for logs
432                        Some(detail),
433                        version,
434                        http_client_traces,
435                    ),
436                    WorkerError::ActivityPreopenedDirError {
437                        reason_kind,
438                        reason_inner,
439                        version,
440                    } => retry_or_fail(
441                        result_obtained_at,
442                        parent,
443                        can_be_retried,
444                        reason_full,
445                        reason_inner,
446                        reason_kind, // short reason for logs
447                        None,        // detail
448                        version,
449                        None, // http_client_traces
450                    ),
451                    WorkerError::ActivityReturnedError {
452                        detail,
453                        version,
454                        http_client_traces,
455                    } => {
456                        let duration = can_be_retried.expect(
457                            "ActivityReturnedError must not be returned when retries are exhausted",
458                        );
459                        let expires_at = result_obtained_at + duration;
460                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
461                        (
462                            ExecutionEventInner::TemporarilyFailed {
463                                backoff_expires_at: expires_at,
464                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
465                                reason_inner: StrVariant::Static("activity returned error"),
466                                detail, // contains the backtrace
467                                http_client_traces,
468                            },
469                            None,
470                            version,
471                        )
472                    }
473                    WorkerError::TemporaryWorkflowTrap {
474                        reason: reason_inner,
475                        kind,
476                        detail,
477                        version,
478                    } => {
479                        let duration = can_be_retried.expect("workflows are retried forever");
480                        let expires_at = result_obtained_at + duration;
481                        debug!(
482                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
483                        );
484                        (
485                            ExecutionEventInner::TemporarilyFailed {
486                                backoff_expires_at: expires_at,
487                                reason_full: StrVariant::from(reason_full),
488                                reason_inner: StrVariant::from(reason_inner),
489                                detail,
490                                http_client_traces: None,
491                            },
492                            None,
493                            version,
494                        )
495                    }
496                    WorkerError::LimitReached {
497                        reason: inner_reason,
498                        version: new_version,
499                    } => {
500                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
501                        warn!(
502                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
503                        );
504                        (
505                            ExecutionEventInner::Unlocked {
506                                backoff_expires_at: expires_at,
507                                reason: StrVariant::from(reason_full),
508                            },
509                            None,
510                            new_version,
511                        )
512                    }
513                    WorkerError::FatalError(fatal_error, version) => {
514                        info!("Fatal worker error - {fatal_error:?}");
515                        let result = Err(FinishedExecutionError::from(fatal_error));
516                        let child_finished =
517                            parent.map(|(parent_execution_id, parent_join_set)| {
518                                ChildFinishedResponse {
519                                    parent_execution_id,
520                                    parent_join_set,
521                                    result: result.clone(),
522                                }
523                            });
524                        (
525                            ExecutionEventInner::Finished {
526                                result,
527                                http_client_traces: None,
528                            },
529                            child_finished,
530                            version,
531                        )
532                    }
533                };
534                Some(Append {
535                    created_at: result_obtained_at,
536                    primary_event: AppendRequest {
537                        created_at: result_obtained_at,
538                        event: primary_event,
539                    },
540                    execution_id,
541                    version,
542                    child_finished,
543                })
544            }
545        })
546    }
547}
548
549#[expect(clippy::too_many_arguments)]
550fn retry_or_fail(
551    result_obtained_at: DateTime<Utc>,
552    parent: Option<(ExecutionId, JoinSetId)>,
553    can_be_retried: Option<Duration>,
554    reason_full: String,
555    reason_inner: String,
556    err_kind_for_logs: impl Display,
557    detail: Option<String>,
558    version: Version,
559    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
560) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
561    if let Some(duration) = can_be_retried {
562        let expires_at = result_obtained_at + duration;
563        debug!(
564            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
565        );
566        (
567            ExecutionEventInner::TemporarilyFailed {
568                backoff_expires_at: expires_at,
569                reason_full: StrVariant::from(reason_full),
570                reason_inner: StrVariant::from(reason_inner),
571                detail,
572                http_client_traces,
573            },
574            None,
575            version,
576        )
577    } else {
578        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
579        let result = Err(FinishedExecutionError::PermanentFailure {
580            reason_inner,
581            reason_full,
582            kind: PermanentFailureKind::ActivityTrap,
583            detail,
584        });
585        let child_finished =
586            parent.map(
587                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
588                    parent_execution_id,
589                    parent_join_set,
590                    result: result.clone(),
591                },
592            );
593        (
594            ExecutionEventInner::Finished {
595                result,
596                http_client_traces,
597            },
598            child_finished,
599            version,
600        )
601    }
602}
603
604#[derive(Debug, Clone)]
605pub(crate) struct ChildFinishedResponse {
606    pub(crate) parent_execution_id: ExecutionId,
607    pub(crate) parent_join_set: JoinSetId,
608    pub(crate) result: FinishedExecutionResult,
609}
610
611#[derive(Debug, Clone)]
612pub(crate) struct Append {
613    pub(crate) created_at: DateTime<Utc>,
614    pub(crate) primary_event: AppendRequest,
615    pub(crate) execution_id: ExecutionId,
616    pub(crate) version: Version,
617    pub(crate) child_finished: Option<ChildFinishedResponse>,
618}
619
620impl Append {
621    pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
622        if let Some(child_finished) = self.child_finished {
623            assert_matches!(
624                &self.primary_event,
625                AppendRequest {
626                    event: ExecutionEventInner::Finished { .. },
627                    ..
628                }
629            );
630            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
631            db_connection
632                .append_batch_respond_to_parent(
633                    derived.clone(),
634                    self.created_at,
635                    vec![self.primary_event],
636                    self.version.clone(),
637                    child_finished.parent_execution_id,
638                    JoinSetResponseEventOuter {
639                        created_at: self.created_at,
640                        event: JoinSetResponseEvent {
641                            join_set_id: child_finished.parent_join_set,
642                            event: JoinSetResponse::ChildExecutionFinished {
643                                child_execution_id: derived,
644                                // Since self.primary_event is a finished event, the version will remain the same.
645                                finished_version: self.version,
646                                result: child_finished.result,
647                            },
648                        },
649                    },
650                )
651                .await?;
652        } else {
653            db_connection
654                .append_batch(
655                    self.created_at,
656                    vec![self.primary_event],
657                    self.execution_id,
658                    self.version,
659                )
660                .await?;
661        }
662        Ok(())
663    }
664}
665
666#[cfg(any(test, feature = "test"))]
667pub mod simple_worker {
668    use crate::worker::{Worker, WorkerContext, WorkerResult};
669    use async_trait::async_trait;
670    use concepts::{
671        FunctionFqn, FunctionMetadata, ParameterTypes,
672        storage::{HistoryEvent, Version},
673    };
674    use indexmap::IndexMap;
675    use std::sync::Arc;
676    use tracing::trace;
677
678    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
679    pub type SimpleWorkerResultMap =
680        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
681
682    #[derive(Clone, Debug)]
683    pub struct SimpleWorker {
684        pub worker_results_rev: SimpleWorkerResultMap,
685        pub ffqn: FunctionFqn,
686        exported: [FunctionMetadata; 1],
687    }
688
689    impl SimpleWorker {
690        #[must_use]
691        pub fn with_single_result(res: WorkerResult) -> Self {
692            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
693                Version::new(2),
694                (vec![], res),
695            )]))))
696        }
697
698        #[must_use]
699        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
700            Self {
701                worker_results_rev: self.worker_results_rev,
702                exported: [FunctionMetadata {
703                    ffqn: ffqn.clone(),
704                    parameter_types: ParameterTypes::default(),
705                    return_type: None,
706                    extension: None,
707                    submittable: true,
708                }],
709                ffqn,
710            }
711        }
712
713        #[must_use]
714        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
715            Self {
716                worker_results_rev,
717                ffqn: FFQN_SOME,
718                exported: [FunctionMetadata {
719                    ffqn: FFQN_SOME,
720                    parameter_types: ParameterTypes::default(),
721                    return_type: None,
722                    extension: None,
723                    submittable: true,
724                }],
725            }
726        }
727    }
728
729    #[async_trait]
730    impl Worker for SimpleWorker {
731        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
732            let (expected_version, (expected_eh, worker_result)) =
733                self.worker_results_rev.lock().unwrap().pop().unwrap();
734            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
735            assert_eq!(expected_version, ctx.version);
736            assert_eq!(expected_eh, ctx.event_history);
737            worker_result
738        }
739
740        fn exported_functions(&self) -> &[FunctionMetadata] {
741            &self.exported
742        }
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use self::simple_worker::SimpleWorker;
749    use super::*;
750    use crate::worker::FatalError;
751    use crate::{expired_timers_watcher, worker::WorkerResult};
752    use assert_matches::assert_matches;
753    use async_trait::async_trait;
754    use concepts::storage::{CreateRequest, JoinSetRequest};
755    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
756    use concepts::time::Now;
757    use concepts::{
758        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
759        SupportedFunctionReturnValue, TrapKind,
760    };
761    use db_tests::Database;
762    use indexmap::IndexMap;
763    use simple_worker::FFQN_SOME;
764    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
765    use test_utils::set_up;
766    use test_utils::sim_clock::{ConstClock, SimClock};
767
768    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
769
770    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
771        config: ExecConfig,
772        clock_fn: C,
773        db_pool: Arc<dyn DbPool>,
774        worker: Arc<W>,
775        executed_at: DateTime<Utc>,
776    ) -> ExecutionProgress {
777        trace!("Ticking with {worker:?}");
778        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
779        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
780        let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
781        loop {
782            execution_progress
783                .executions
784                .retain(|(_, abort_handle)| !abort_handle.is_finished());
785            if execution_progress.executions.is_empty() {
786                return execution_progress;
787            }
788            tokio::time::sleep(Duration::from_millis(10)).await;
789        }
790    }
791
792    #[tokio::test]
793    async fn execute_simple_lifecycle_tick_based_mem() {
794        let created_at = Now.now();
795        let (_guard, db_pool) = Database::Memory.set_up().await;
796        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
797        db_pool.close().await.unwrap();
798    }
799
800    #[tokio::test]
801    async fn execute_simple_lifecycle_tick_based_sqlite() {
802        let created_at = Now.now();
803        let (_guard, db_pool) = Database::Sqlite.set_up().await;
804        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
805        db_pool.close().await.unwrap();
806    }
807
808    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
809        pool: Arc<dyn DbPool>,
810        clock_fn: C,
811    ) {
812        set_up();
813        let created_at = clock_fn.now();
814        let exec_config = ExecConfig {
815            batch_size: 1,
816            lock_expiry: Duration::from_secs(1),
817            tick_sleep: Duration::from_millis(100),
818            component_id: ComponentId::dummy_activity(),
819            task_limiter: None,
820            executor_id: ExecutorId::generate(),
821        };
822
823        let execution_log = create_and_tick(
824            CreateAndTickConfig {
825                execution_id: ExecutionId::generate(),
826                created_at,
827                max_retries: 0,
828                executed_at: created_at,
829                retry_exp_backoff: Duration::ZERO,
830            },
831            clock_fn,
832            pool,
833            exec_config,
834            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
835                SupportedFunctionReturnValue::None,
836                Version::new(2),
837                None,
838            ))),
839            tick_fn,
840        )
841        .await;
842        assert_matches!(
843            execution_log.events.get(2).unwrap(),
844            ExecutionEvent {
845                event: ExecutionEventInner::Finished {
846                    result: Ok(SupportedFunctionReturnValue::None),
847                    http_client_traces: None
848                },
849                created_at: _,
850                backtrace_id: None,
851            }
852        );
853    }
854
855    #[tokio::test]
856    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
857        set_up();
858        let created_at = Now.now();
859        let clock_fn = ConstClock(created_at);
860        let (_guard, db_pool) = Database::Memory.set_up().await;
861        let exec_config = ExecConfig {
862            batch_size: 1,
863            lock_expiry: Duration::from_secs(1),
864            tick_sleep: Duration::ZERO,
865            component_id: ComponentId::dummy_activity(),
866            task_limiter: None,
867            executor_id: ExecutorId::generate(),
868        };
869
870        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
871            SupportedFunctionReturnValue::None,
872            Version::new(2),
873            None,
874        )));
875        let exec_task = ExecTask::spawn_new(
876            worker.clone(),
877            exec_config.clone(),
878            clock_fn,
879            db_pool.clone(),
880        );
881
882        let execution_log = create_and_tick(
883            CreateAndTickConfig {
884                execution_id: ExecutionId::generate(),
885                created_at,
886                max_retries: 0,
887                executed_at: created_at,
888                retry_exp_backoff: Duration::ZERO,
889            },
890            clock_fn,
891            db_pool.clone(),
892            exec_config,
893            worker,
894            |_, _, _, _, _| async {
895                tokio::time::sleep(Duration::from_secs(1)).await; // FIXME: non determinism, possible race
896                ExecutionProgress::default()
897            },
898        )
899        .await;
900        exec_task.close().await;
901        db_pool.close().await.unwrap();
902        assert_matches!(
903            execution_log.events.get(2).unwrap(),
904            ExecutionEvent {
905                event: ExecutionEventInner::Finished {
906                    result: Ok(SupportedFunctionReturnValue::None),
907                    http_client_traces: None
908                },
909                created_at: _,
910                backtrace_id: None,
911            }
912        );
913    }
914
915    struct CreateAndTickConfig {
916        execution_id: ExecutionId,
917        created_at: DateTime<Utc>,
918        max_retries: u32,
919        executed_at: DateTime<Utc>,
920        retry_exp_backoff: Duration,
921    }
922
923    async fn create_and_tick<
924        W: Worker,
925        C: ClockFn,
926        T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
927        F: Future<Output = ExecutionProgress>,
928    >(
929        config: CreateAndTickConfig,
930        clock_fn: C,
931        db_pool: Arc<dyn DbPool>,
932        exec_config: ExecConfig,
933        worker: Arc<W>,
934        mut tick: T,
935    ) -> ExecutionLog {
936        // Create an execution
937        let db_connection = db_pool.connection();
938        db_connection
939            .create(CreateRequest {
940                created_at: config.created_at,
941                execution_id: config.execution_id.clone(),
942                ffqn: FFQN_SOME,
943                params: Params::empty(),
944                parent: None,
945                metadata: concepts::ExecutionMetadata::empty(),
946                scheduled_at: config.created_at,
947                retry_exp_backoff: config.retry_exp_backoff,
948                max_retries: config.max_retries,
949                component_id: ComponentId::dummy_activity(),
950                scheduled_by: None,
951            })
952            .await
953            .unwrap();
954        // execute!
955        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
956        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
957        debug!("Execution history after tick: {execution_log:?}");
958        // check that DB contains Created and Locked events.
959        assert_matches!(
960            execution_log.events.first().unwrap(),
961            ExecutionEvent {
962                event: ExecutionEventInner::Created { .. },
963                created_at: actually_created_at,
964                backtrace_id: None,
965            }
966            if config.created_at == *actually_created_at
967        );
968        let locked_at = assert_matches!(
969            execution_log.events.get(1).unwrap(),
970            ExecutionEvent {
971                event: ExecutionEventInner::Locked { .. },
972                created_at: locked_at,
973                backtrace_id: None,
974            } if config.created_at <= *locked_at
975            => *locked_at
976        );
977        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
978            event: _,
979            created_at: executed_at,
980            backtrace_id: None,
981        } if *executed_at >= locked_at);
982        execution_log
983    }
984
985    #[tokio::test]
986    async fn activity_trap_should_trigger_an_execution_retry() {
987        set_up();
988        let sim_clock = SimClock::default();
989        let (_guard, db_pool) = Database::Memory.set_up().await;
990        let exec_config = ExecConfig {
991            batch_size: 1,
992            lock_expiry: Duration::from_secs(1),
993            tick_sleep: Duration::ZERO,
994            component_id: ComponentId::dummy_activity(),
995            task_limiter: None,
996            executor_id: ExecutorId::generate(),
997        };
998        let expected_reason = "error reason";
999        let expected_detail = "error detail";
1000        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1001            WorkerError::ActivityTrap {
1002                reason: expected_reason.to_string(),
1003                trap_kind: concepts::TrapKind::Trap,
1004                detail: expected_detail.to_string(),
1005                version: Version::new(2),
1006                http_client_traces: None,
1007            },
1008        )));
1009        let retry_exp_backoff = Duration::from_millis(100);
1010        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1011        let execution_log = create_and_tick(
1012            CreateAndTickConfig {
1013                execution_id: ExecutionId::generate(),
1014                created_at: sim_clock.now(),
1015                max_retries: 1,
1016                executed_at: sim_clock.now(),
1017                retry_exp_backoff,
1018            },
1019            sim_clock.clone(),
1020            db_pool.clone(),
1021            exec_config.clone(),
1022            worker,
1023            tick_fn,
1024        )
1025        .await;
1026        assert_eq!(3, execution_log.events.len());
1027        {
1028            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1029                &execution_log.events.get(2).unwrap(),
1030                ExecutionEvent {
1031                    event: ExecutionEventInner::TemporarilyFailed {
1032                        reason_inner,
1033                        reason_full,
1034                        detail,
1035                        backoff_expires_at,
1036                        http_client_traces: None,
1037                    },
1038                    created_at: at,
1039                    backtrace_id: None,
1040                }
1041                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1042            );
1043            assert_eq!(expected_reason, reason_inner.deref());
1044            assert_eq!(
1045                format!("activity trap: {expected_reason}"),
1046                reason_full.deref()
1047            );
1048            assert_eq!(Some(expected_detail), detail.as_deref());
1049            assert_eq!(at, sim_clock.now());
1050            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1051        }
1052        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1053            std::sync::Mutex::new(IndexMap::from([(
1054                Version::new(4),
1055                (
1056                    vec![],
1057                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1058                ),
1059            )])),
1060        )));
1061        // noop until `retry_exp_backoff` expires
1062        assert!(
1063            tick_fn(
1064                exec_config.clone(),
1065                sim_clock.clone(),
1066                db_pool.clone(),
1067                worker.clone(),
1068                sim_clock.now(),
1069            )
1070            .await
1071            .executions
1072            .is_empty()
1073        );
1074        // tick again to finish the execution
1075        sim_clock.move_time_forward(retry_exp_backoff);
1076        tick_fn(
1077            exec_config,
1078            sim_clock.clone(),
1079            db_pool.clone(),
1080            worker,
1081            sim_clock.now(),
1082        )
1083        .await;
1084        let execution_log = {
1085            let db_connection = db_pool.connection();
1086            db_connection
1087                .get(&execution_log.execution_id)
1088                .await
1089                .unwrap()
1090        };
1091        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1092        assert_matches!(
1093            execution_log.events.get(3).unwrap(),
1094            ExecutionEvent {
1095                event: ExecutionEventInner::Locked { .. },
1096                created_at: at,
1097                backtrace_id: None,
1098            } if *at == sim_clock.now()
1099        );
1100        assert_matches!(
1101            execution_log.events.get(4).unwrap(),
1102            ExecutionEvent {
1103                event: ExecutionEventInner::Finished {
1104                    result: Ok(SupportedFunctionReturnValue::None),
1105                    http_client_traces: None
1106                },
1107                created_at: finished_at,
1108                backtrace_id: None,
1109            } if *finished_at == sim_clock.now()
1110        );
1111        db_pool.close().await.unwrap();
1112    }
1113
1114    #[tokio::test]
1115    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1116        set_up();
1117        let created_at = Now.now();
1118        let clock_fn = ConstClock(created_at);
1119        let (_guard, db_pool) = Database::Memory.set_up().await;
1120        let exec_config = ExecConfig {
1121            batch_size: 1,
1122            lock_expiry: Duration::from_secs(1),
1123            tick_sleep: Duration::ZERO,
1124            component_id: ComponentId::dummy_activity(),
1125            task_limiter: None,
1126            executor_id: ExecutorId::generate(),
1127        };
1128
1129        let expected_reason = "error reason";
1130        let expected_detail = "error detail";
1131        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1132            WorkerError::ActivityTrap {
1133                reason: expected_reason.to_string(),
1134                trap_kind: concepts::TrapKind::Trap,
1135                detail: expected_detail.to_string(),
1136                version: Version::new(2),
1137                http_client_traces: None,
1138            },
1139        )));
1140        let execution_log = create_and_tick(
1141            CreateAndTickConfig {
1142                execution_id: ExecutionId::generate(),
1143                created_at,
1144                max_retries: 0,
1145                executed_at: created_at,
1146                retry_exp_backoff: Duration::ZERO,
1147            },
1148            clock_fn,
1149            db_pool.clone(),
1150            exec_config.clone(),
1151            worker,
1152            tick_fn,
1153        )
1154        .await;
1155        assert_eq!(3, execution_log.events.len());
1156        let (reason, kind, detail) = assert_matches!(
1157            &execution_log.events.get(2).unwrap(),
1158            ExecutionEvent {
1159                event: ExecutionEventInner::Finished{
1160                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1161                    http_client_traces: None
1162                },
1163                created_at: at,
1164                backtrace_id: None,
1165            } if *at == created_at
1166            => (reason_inner, kind, detail)
1167        );
1168        assert_eq!(expected_reason, *reason);
1169        assert_eq!(Some(expected_detail), detail.as_deref());
1170        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1171
1172        db_pool.close().await.unwrap();
1173    }
1174
1175    #[tokio::test]
1176    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1177        let worker_error = WorkerError::ActivityTrap {
1178            reason: "error reason".to_string(),
1179            trap_kind: TrapKind::Trap,
1180            detail: "detail".to_string(),
1181            version: Version::new(2),
1182            http_client_traces: None,
1183        };
1184        let expected_child_err = FinishedExecutionError::PermanentFailure {
1185            reason_full: "activity trap: error reason".to_string(),
1186            reason_inner: "error reason".to_string(),
1187            kind: PermanentFailureKind::ActivityTrap,
1188            detail: Some("detail".to_string()),
1189        };
1190        child_execution_permanently_failed_should_notify_parent(
1191            WorkerResult::Err(worker_error),
1192            expected_child_err,
1193        )
1194        .await;
1195    }
1196
1197    // TODO: Add test for WorkerError::TemporaryTimeout
1198
1199    #[tokio::test]
1200    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1201        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1202        child_execution_permanently_failed_should_notify_parent(
1203            WorkerResult::DbUpdatedByWorkerOrWatcher,
1204            expected_child_err,
1205        )
1206        .await;
1207    }
1208
1209    #[tokio::test]
1210    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1211        let parent_id = ExecutionId::from_parts(1, 1);
1212        let join_set_id_outer =
1213            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1214        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1215        let join_set_id_inner =
1216            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1217        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1218        let worker_error = WorkerError::FatalError(
1219            FatalError::UnhandledChildExecutionError {
1220                child_execution_id: child_execution_id.clone(),
1221                root_cause_id: root_cause_id.clone(),
1222            },
1223            Version::new(2),
1224        );
1225        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1226            child_execution_id,
1227            root_cause_id,
1228        };
1229        child_execution_permanently_failed_should_notify_parent(
1230            WorkerResult::Err(worker_error),
1231            expected_child_err,
1232        )
1233        .await;
1234    }
1235
1236    async fn child_execution_permanently_failed_should_notify_parent(
1237        worker_result: WorkerResult,
1238        expected_child_err: FinishedExecutionError,
1239    ) {
1240        use concepts::storage::JoinSetResponseEventOuter;
1241        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1242
1243        set_up();
1244        let sim_clock = SimClock::default();
1245        let (_guard, db_pool) = Database::Memory.set_up().await;
1246
1247        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1248            WorkerResult::DbUpdatedByWorkerOrWatcher,
1249        ));
1250        let parent_execution_id = ExecutionId::generate();
1251        db_pool
1252            .connection()
1253            .create(CreateRequest {
1254                created_at: sim_clock.now(),
1255                execution_id: parent_execution_id.clone(),
1256                ffqn: FFQN_SOME,
1257                params: Params::empty(),
1258                parent: None,
1259                metadata: concepts::ExecutionMetadata::empty(),
1260                scheduled_at: sim_clock.now(),
1261                retry_exp_backoff: Duration::ZERO,
1262                max_retries: 0,
1263                component_id: ComponentId::dummy_activity(),
1264                scheduled_by: None,
1265            })
1266            .await
1267            .unwrap();
1268        tick_fn(
1269            ExecConfig {
1270                batch_size: 1,
1271                lock_expiry: LOCK_EXPIRY,
1272                tick_sleep: Duration::ZERO,
1273                component_id: ComponentId::dummy_activity(),
1274                task_limiter: None,
1275                executor_id: ExecutorId::generate(),
1276            },
1277            sim_clock.clone(),
1278            db_pool.clone(),
1279            parent_worker,
1280            sim_clock.now(),
1281        )
1282        .await;
1283
1284        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1285        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1286        // executor does not append anything, this should have been written by the worker:
1287        {
1288            let child = CreateRequest {
1289                created_at: sim_clock.now(),
1290                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1291                ffqn: FFQN_CHILD,
1292                params: Params::empty(),
1293                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1294                metadata: concepts::ExecutionMetadata::empty(),
1295                scheduled_at: sim_clock.now(),
1296                retry_exp_backoff: Duration::ZERO,
1297                max_retries: 0,
1298                component_id: ComponentId::dummy_activity(),
1299                scheduled_by: None,
1300            };
1301            let current_time = sim_clock.now();
1302            let join_set = AppendRequest {
1303                created_at: current_time,
1304                event: ExecutionEventInner::HistoryEvent {
1305                    event: HistoryEvent::JoinSetCreate {
1306                        join_set_id: join_set_id.clone(),
1307                        closing_strategy: ClosingStrategy::Complete,
1308                    },
1309                },
1310            };
1311            let child_exec_req = AppendRequest {
1312                created_at: current_time,
1313                event: ExecutionEventInner::HistoryEvent {
1314                    event: HistoryEvent::JoinSetRequest {
1315                        join_set_id: join_set_id.clone(),
1316                        request: JoinSetRequest::ChildExecutionRequest {
1317                            child_execution_id: child_execution_id.clone(),
1318                        },
1319                    },
1320                },
1321            };
1322            let join_next = AppendRequest {
1323                created_at: current_time,
1324                event: ExecutionEventInner::HistoryEvent {
1325                    event: HistoryEvent::JoinNext {
1326                        join_set_id: join_set_id.clone(),
1327                        run_expires_at: sim_clock.now(),
1328                        closing: false,
1329                    },
1330                },
1331            };
1332            db_pool
1333                .connection()
1334                .append_batch_create_new_execution(
1335                    current_time,
1336                    vec![join_set, child_exec_req, join_next],
1337                    parent_execution_id.clone(),
1338                    Version::new(2),
1339                    vec![child],
1340                )
1341                .await
1342                .unwrap();
1343        }
1344
1345        let child_worker =
1346            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1347
1348        // execute the child
1349        tick_fn(
1350            ExecConfig {
1351                batch_size: 1,
1352                lock_expiry: LOCK_EXPIRY,
1353                tick_sleep: Duration::ZERO,
1354                component_id: ComponentId::dummy_activity(),
1355                task_limiter: None,
1356                executor_id: ExecutorId::generate(),
1357            },
1358            sim_clock.clone(),
1359            db_pool.clone(),
1360            child_worker,
1361            sim_clock.now(),
1362        )
1363        .await;
1364        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1365            // In case of timeout, let the timers watcher handle it
1366            sim_clock.move_time_forward(LOCK_EXPIRY);
1367            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1368                .await
1369                .unwrap();
1370        }
1371        let child_log = db_pool
1372            .connection()
1373            .get(&ExecutionId::Derived(child_execution_id.clone()))
1374            .await
1375            .unwrap();
1376        assert!(child_log.pending_state.is_finished());
1377        assert_eq!(
1378            Version(2),
1379            child_log.next_version,
1380            "created = 0, locked = 1, with_single_result = 2"
1381        );
1382        assert_eq!(
1383            ExecutionEventInner::Finished {
1384                result: Err(expected_child_err),
1385                http_client_traces: None
1386            },
1387            child_log.last_event().event
1388        );
1389        let parent_log = db_pool
1390            .connection()
1391            .get(&parent_execution_id)
1392            .await
1393            .unwrap();
1394        assert_matches!(
1395            parent_log.pending_state,
1396            PendingState::PendingAt {
1397                scheduled_at
1398            } if scheduled_at == sim_clock.now(),
1399            "parent should be back to pending"
1400        );
1401        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1402            parent_log.responses.last(),
1403            Some(JoinSetResponseEventOuter{
1404                created_at: at,
1405                event: JoinSetResponseEvent{
1406                    join_set_id: found_join_set_id,
1407                    event: JoinSetResponse::ChildExecutionFinished {
1408                        child_execution_id: found_child_execution_id,
1409                        finished_version,
1410                        result: found_result,
1411                    }
1412                }
1413            })
1414             if *at == sim_clock.now()
1415            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1416        );
1417        assert_eq!(join_set_id, *found_join_set_id);
1418        assert_eq!(child_execution_id, *found_child_execution_id);
1419        assert_eq!(child_log.next_version, *child_finished_version);
1420        assert!(found_result.is_err());
1421
1422        db_pool.close().await.unwrap();
1423    }
1424
1425    #[derive(Clone, Debug)]
1426    struct SleepyWorker {
1427        duration: Duration,
1428        result: SupportedFunctionReturnValue,
1429        exported: [FunctionMetadata; 1],
1430    }
1431
1432    #[async_trait]
1433    impl Worker for SleepyWorker {
1434        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1435            tokio::time::sleep(self.duration).await;
1436            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1437        }
1438
1439        fn exported_functions(&self) -> &[FunctionMetadata] {
1440            &self.exported
1441        }
1442    }
1443
1444    #[tokio::test]
1445    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1446        set_up();
1447        let sim_clock = SimClock::default();
1448        let (_guard, db_pool) = Database::Memory.set_up().await;
1449        let lock_expiry = Duration::from_millis(100);
1450        let exec_config = ExecConfig {
1451            batch_size: 1,
1452            lock_expiry,
1453            tick_sleep: Duration::ZERO,
1454            component_id: ComponentId::dummy_activity(),
1455            task_limiter: None,
1456            executor_id: ExecutorId::generate(),
1457        };
1458
1459        let worker = Arc::new(SleepyWorker {
1460            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1461            result: SupportedFunctionReturnValue::None,
1462            exported: [FunctionMetadata {
1463                ffqn: FFQN_SOME,
1464                parameter_types: ParameterTypes::default(),
1465                return_type: None,
1466                extension: None,
1467                submittable: true,
1468            }],
1469        });
1470        // Create an execution
1471        let execution_id = ExecutionId::generate();
1472        let timeout_duration = Duration::from_millis(300);
1473        let db_connection = db_pool.connection();
1474        db_connection
1475            .create(CreateRequest {
1476                created_at: sim_clock.now(),
1477                execution_id: execution_id.clone(),
1478                ffqn: FFQN_SOME,
1479                params: Params::empty(),
1480                parent: None,
1481                metadata: concepts::ExecutionMetadata::empty(),
1482                scheduled_at: sim_clock.now(),
1483                retry_exp_backoff: timeout_duration,
1484                max_retries: 1,
1485                component_id: ComponentId::dummy_activity(),
1486                scheduled_by: None,
1487            })
1488            .await
1489            .unwrap();
1490
1491        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1492        let executor = ExecTask::new(
1493            worker,
1494            exec_config,
1495            sim_clock.clone(),
1496            db_pool.clone(),
1497            ffqns,
1498        );
1499        let mut first_execution_progress = executor
1500            .tick(sim_clock.now(), RunId::generate())
1501            .await
1502            .unwrap();
1503        assert_eq!(1, first_execution_progress.executions.len());
1504        // Started hanging, wait for lock expiry.
1505        sim_clock.move_time_forward(lock_expiry);
1506        // cleanup should be called
1507        let now_after_first_lock_expiry = sim_clock.now();
1508        {
1509            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1510            let cleanup_progress = executor
1511                .tick(now_after_first_lock_expiry, RunId::generate())
1512                .await
1513                .unwrap();
1514            assert!(cleanup_progress.executions.is_empty());
1515        }
1516        {
1517            let expired_locks = expired_timers_watcher::tick(
1518                db_pool.connection().as_ref(),
1519                now_after_first_lock_expiry,
1520            )
1521            .await
1522            .unwrap()
1523            .expired_locks;
1524            assert_eq!(1, expired_locks);
1525        }
1526        assert!(
1527            !first_execution_progress
1528                .executions
1529                .pop()
1530                .unwrap()
1531                .1
1532                .is_finished()
1533        );
1534
1535        let execution_log = db_connection.get(&execution_id).await.unwrap();
1536        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1537        assert_matches!(
1538            &execution_log.events.get(2).unwrap(),
1539            ExecutionEvent {
1540                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1541                created_at: at,
1542                backtrace_id: None,
1543            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1544        );
1545        assert_eq!(
1546            PendingState::PendingAt {
1547                scheduled_at: expected_first_timeout_expiry
1548            },
1549            execution_log.pending_state
1550        );
1551        sim_clock.move_time_forward(timeout_duration);
1552        let now_after_first_timeout = sim_clock.now();
1553        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1554
1555        let mut second_execution_progress = executor
1556            .tick(now_after_first_timeout, RunId::generate())
1557            .await
1558            .unwrap();
1559        assert_eq!(1, second_execution_progress.executions.len());
1560
1561        // Started hanging, wait for lock expiry.
1562        sim_clock.move_time_forward(lock_expiry);
1563        // cleanup should be called
1564        let now_after_second_lock_expiry = sim_clock.now();
1565        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1566        {
1567            let cleanup_progress = executor
1568                .tick(now_after_second_lock_expiry, RunId::generate())
1569                .await
1570                .unwrap();
1571            assert!(cleanup_progress.executions.is_empty());
1572        }
1573        {
1574            let expired_locks = expired_timers_watcher::tick(
1575                db_pool.connection().as_ref(),
1576                now_after_second_lock_expiry,
1577            )
1578            .await
1579            .unwrap()
1580            .expired_locks;
1581            assert_eq!(1, expired_locks);
1582        }
1583        assert!(
1584            !second_execution_progress
1585                .executions
1586                .pop()
1587                .unwrap()
1588                .1
1589                .is_finished()
1590        );
1591
1592        drop(db_connection);
1593        drop(executor);
1594        db_pool.close().await.unwrap();
1595    }
1596}