obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12    FinishedExecutionError,
13    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::fmt::Display;
17use std::marker::PhantomData;
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30    pub lock_expiry: Duration,
31    pub tick_sleep: Duration,
32    pub batch_size: u32,
33    pub component_id: ComponentId,
34    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35}
36
37pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
38    worker: Arc<dyn Worker>,
39    config: ExecConfig,
40    clock_fn: C, // Used for obtaining current time when the execution finishes.
41    db_pool: P,
42    executor_id: ExecutorId,
43    phantom_data: PhantomData<DB>,
44    ffqns: Arc<[FunctionFqn]>,
45}
46
47#[derive(derive_more::Debug, Default)]
48pub struct ExecutionProgress {
49    #[debug(skip)]
50    #[allow(dead_code)]
51    executions: Vec<(ExecutionId, JoinHandle<()>)>,
52}
53
54impl ExecutionProgress {
55    #[cfg(feature = "test")]
56    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
57        let execs = self.executions.len();
58        for (_, handle) in self.executions {
59            handle.await?;
60        }
61        Ok(execs)
62    }
63}
64
65#[derive(derive_more::Debug)]
66pub struct ExecutorTaskHandle {
67    #[debug(skip)]
68    is_closing: Arc<AtomicBool>,
69    #[debug(skip)]
70    abort_handle: AbortHandle,
71    component_id: ComponentId,
72    executor_id: ExecutorId,
73}
74
75impl ExecutorTaskHandle {
76    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
77    pub async fn close(&self) {
78        trace!("Gracefully closing");
79        self.is_closing.store(true, Ordering::Relaxed);
80        while !self.abort_handle.is_finished() {
81            tokio::time::sleep(Duration::from_millis(1)).await;
82        }
83        debug!("Gracefully closed");
84    }
85}
86
87impl Drop for ExecutorTaskHandle {
88    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
89    fn drop(&mut self) {
90        if self.abort_handle.is_finished() {
91            return;
92        }
93        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
94        self.abort_handle.abort();
95    }
96}
97
98#[cfg(feature = "test")]
99pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
100    extract_exported_ffqns_noext(worker)
101}
102
103fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
104    worker
105        .exported_functions()
106        .iter()
107        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
108        .collect::<Arc<_>>()
109}
110
111impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
112    #[cfg(feature = "test")]
113    pub fn new(
114        worker: Arc<dyn Worker>,
115        config: ExecConfig,
116        clock_fn: C,
117        db_pool: P,
118        ffqns: Arc<[FunctionFqn]>,
119    ) -> Self {
120        Self {
121            worker,
122            config,
123            executor_id: ExecutorId::generate(),
124            db_pool,
125            phantom_data: PhantomData,
126            ffqns,
127            clock_fn,
128        }
129    }
130
131    pub fn spawn_new(
132        worker: Arc<dyn Worker>,
133        config: ExecConfig,
134        clock_fn: C,
135        db_pool: P,
136        executor_id: ExecutorId,
137    ) -> ExecutorTaskHandle {
138        let is_closing = Arc::new(AtomicBool::default());
139        let is_closing_inner = is_closing.clone();
140        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
141        let component_id = config.component_id.clone();
142        let abort_handle = tokio::spawn(async move {
143            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
144            let task = Self {
145                worker,
146                config,
147                executor_id,
148                db_pool,
149                phantom_data: PhantomData,
150                ffqns: ffqns.clone(),
151                clock_fn: clock_fn.clone(),
152            };
153            loop {
154                let _ = task.tick(clock_fn.now()).await;
155                let executed_at = clock_fn.now();
156                task.db_pool
157                    .connection()
158                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
159                    .await;
160                if is_closing_inner.load(Ordering::Relaxed) {
161                    return;
162                }
163            }
164        })
165        .abort_handle();
166        ExecutorTaskHandle {
167            is_closing,
168            abort_handle,
169            component_id,
170            executor_id,
171        }
172    }
173
174    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
175        if let Some(task_limiter) = &self.config.task_limiter {
176            let mut locks = Vec::new();
177            for _ in 0..self.config.batch_size {
178                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
179                    locks.push(Some(permit));
180                } else {
181                    break;
182                }
183            }
184            locks
185        } else {
186            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
187            for _ in 0..self.config.batch_size {
188                vec.push(None);
189            }
190            vec
191        }
192    }
193
194    #[cfg(feature = "test")]
195    pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
196        self.tick(executed_at).await
197    }
198
199    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
200    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
201        let locked_executions = {
202            let mut permits = self.acquire_task_permits();
203            if permits.is_empty() {
204                return Ok(ExecutionProgress::default());
205            }
206            let db_connection = self.db_pool.connection();
207            let lock_expires_at = executed_at + self.config.lock_expiry;
208            let locked_executions = db_connection
209                .lock_pending(
210                    permits.len(), // batch size
211                    executed_at,   // fetch expiring before now
212                    self.ffqns.clone(),
213                    executed_at, // created at
214                    self.config.component_id.clone(),
215                    self.executor_id,
216                    lock_expires_at,
217                )
218                .await
219                .map_err(|err| {
220                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
221                })?;
222            // Drop permits if too many were allocated.
223            while permits.len() > locked_executions.len() {
224                permits.pop();
225            }
226            assert_eq!(permits.len(), locked_executions.len());
227            locked_executions.into_iter().zip(permits)
228        };
229        let execution_deadline = executed_at + self.config.lock_expiry;
230
231        let mut executions = Vec::with_capacity(locked_executions.len());
232        for (locked_execution, permit) in locked_executions {
233            let execution_id = locked_execution.execution_id.clone();
234            let join_handle = {
235                let worker = self.worker.clone();
236                let db_pool = self.db_pool.clone();
237                let clock_fn = self.clock_fn.clone();
238                let run_id = locked_execution.run_id;
239                let worker_span = info_span!(parent: None, "worker",
240                    "otel.name" = format!("worker {}", locked_execution.ffqn),
241                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
242                locked_execution.metadata.enrich(&worker_span);
243                tokio::spawn({
244                    let worker_span2 = worker_span.clone();
245                    async move {
246                        let res = Self::run_worker(
247                            worker,
248                            &db_pool,
249                            execution_deadline,
250                            clock_fn,
251                            locked_execution,
252                            worker_span2,
253                        )
254                        .await;
255                        if let Err(db_error) = res {
256                            error!("Execution will be timed out not writing `{db_error:?}`");
257                        }
258                        drop(permit);
259                    }
260                    .instrument(worker_span)
261                })
262            };
263            executions.push((execution_id, join_handle));
264        }
265        Ok(ExecutionProgress { executions })
266    }
267
268    async fn run_worker(
269        worker: Arc<dyn Worker>,
270        db_pool: &P,
271        execution_deadline: DateTime<Utc>,
272        clock_fn: C,
273        locked_execution: LockedExecution,
274        worker_span: Span,
275    ) -> Result<(), DbError> {
276        debug!("Worker::run starting");
277        trace!(
278            version = %locked_execution.version,
279            params = ?locked_execution.params,
280            event_history = ?locked_execution.event_history,
281            "Worker::run starting"
282        );
283        let can_be_retried = ExecutionLog::can_be_retried_after(
284            locked_execution.temporary_event_count + 1,
285            locked_execution.max_retries,
286            locked_execution.retry_exp_backoff,
287        );
288        let unlock_expiry_on_limit_reached =
289            ExecutionLog::compute_retry_duration_when_retrying_forever(
290                locked_execution.temporary_event_count + 1,
291                locked_execution.retry_exp_backoff,
292            );
293        let ctx = WorkerContext {
294            execution_id: locked_execution.execution_id.clone(),
295            metadata: locked_execution.metadata,
296            ffqn: locked_execution.ffqn,
297            params: locked_execution.params,
298            event_history: locked_execution.event_history,
299            responses: locked_execution
300                .responses
301                .into_iter()
302                .map(|outer| outer.event)
303                .collect(),
304            version: locked_execution.version,
305            execution_deadline,
306            can_be_retried: can_be_retried.is_some(),
307            run_id: locked_execution.run_id,
308            worker_span,
309        };
310        let worker_result = worker.run(ctx).await;
311        trace!(?worker_result, "Worker::run finished");
312        let result_obtained_at = clock_fn.now();
313        match Self::worker_result_to_execution_event(
314            locked_execution.execution_id,
315            worker_result,
316            result_obtained_at,
317            locked_execution.parent,
318            can_be_retried,
319            unlock_expiry_on_limit_reached,
320        )? {
321            Some(append) => {
322                let db_connection = db_pool.connection();
323                trace!("Appending {append:?}");
324                append.clone().append(&db_connection).await
325            }
326            None => Ok(()),
327        }
328    }
329
330    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
331    /// Map the `WorkerError` to an temporary or a permanent failure.
332    fn worker_result_to_execution_event(
333        execution_id: ExecutionId,
334        worker_result: WorkerResult,
335        result_obtained_at: DateTime<Utc>,
336        parent: Option<(ExecutionId, JoinSetId)>,
337        can_be_retried: Option<Duration>,
338        unlock_expiry_on_limit_reached: Duration,
339    ) -> Result<Option<Append>, DbError> {
340        Ok(match worker_result {
341            WorkerResult::Ok(result, new_version, http_client_traces) => {
342                info!(
343                    "Execution finished: {}",
344                    result.as_pending_state_finished_result()
345                );
346                let child_finished =
347                    parent.map(
348                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349                            parent_execution_id,
350                            parent_join_set,
351                            result: Ok(result.clone()),
352                        },
353                    );
354                let primary_event = AppendRequest {
355                    created_at: result_obtained_at,
356                    event: ExecutionEventInner::Finished {
357                        result: Ok(result),
358                        http_client_traces,
359                    },
360                };
361
362                Some(Append {
363                    created_at: result_obtained_at,
364                    primary_event,
365                    execution_id,
366                    version: new_version,
367                    child_finished,
368                })
369            }
370            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371            WorkerResult::Err(err) => {
372                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
373                let (primary_event, child_finished, version) = match err {
374                    WorkerError::TemporaryTimeout {
375                        http_client_traces,
376                        version,
377                    } => {
378                        if let Some(duration) = can_be_retried {
379                            let backoff_expires_at = result_obtained_at + duration;
380                            info!(
381                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
382                            );
383                            (
384                                ExecutionEventInner::TemporarilyTimedOut {
385                                    backoff_expires_at,
386                                    http_client_traces,
387                                },
388                                None,
389                                version,
390                            )
391                        } else {
392                            info!("Permanent timeout");
393                            let result = Err(FinishedExecutionError::PermanentTimeout);
394                            let child_finished =
395                                parent.map(|(parent_execution_id, parent_join_set)| {
396                                    ChildFinishedResponse {
397                                        parent_execution_id,
398                                        parent_join_set,
399                                        result: result.clone(),
400                                    }
401                                });
402                            (
403                                ExecutionEventInner::Finished {
404                                    result,
405                                    http_client_traces,
406                                },
407                                child_finished,
408                                version,
409                            )
410                        }
411                    }
412                    WorkerError::DbError(db_error) => {
413                        return Err(db_error);
414                    }
415                    WorkerError::ActivityTrap {
416                        reason: reason_inner,
417                        trap_kind,
418                        detail,
419                        version,
420                        http_client_traces,
421                    } => retry_or_fail(
422                        result_obtained_at,
423                        parent,
424                        can_be_retried,
425                        reason_full,
426                        reason_inner,
427                        trap_kind, // short reason like `trap` for logs
428                        Some(detail),
429                        version,
430                        http_client_traces,
431                    ),
432                    WorkerError::ActivityPreopenedDirError {
433                        reason_kind,
434                        reason_inner,
435                        version,
436                    } => retry_or_fail(
437                        result_obtained_at,
438                        parent,
439                        can_be_retried,
440                        reason_full,
441                        reason_inner,
442                        reason_kind, // short reason for logs
443                        None,        // detail
444                        version,
445                        None, // http_client_traces
446                    ),
447                    WorkerError::ActivityReturnedError {
448                        detail,
449                        version,
450                        http_client_traces,
451                    } => {
452                        let duration = can_be_retried.expect(
453                            "ActivityReturnedError must not be returned when retries are exhausted",
454                        );
455                        let expires_at = result_obtained_at + duration;
456                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
457                        (
458                            ExecutionEventInner::TemporarilyFailed {
459                                backoff_expires_at: expires_at,
460                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
461                                reason_inner: StrVariant::Static("activity returned error"),
462                                detail, // contains the backtrace
463                                http_client_traces,
464                            },
465                            None,
466                            version,
467                        )
468                    }
469                    WorkerError::TemporaryWorkflowTrap {
470                        reason: reason_inner,
471                        kind,
472                        detail,
473                        version,
474                    } => {
475                        let duration = can_be_retried.expect("workflows are retried forever");
476                        let expires_at = result_obtained_at + duration;
477                        debug!(
478                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
479                        );
480                        (
481                            ExecutionEventInner::TemporarilyFailed {
482                                backoff_expires_at: expires_at,
483                                reason_full: StrVariant::from(reason_full),
484                                reason_inner: StrVariant::from(reason_inner),
485                                detail,
486                                http_client_traces: None,
487                            },
488                            None,
489                            version,
490                        )
491                    }
492                    WorkerError::LimitReached {
493                        reason: inner_reason,
494                        version: new_version,
495                    } => {
496                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
497                        warn!(
498                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
499                        );
500                        (
501                            ExecutionEventInner::Unlocked {
502                                backoff_expires_at: expires_at,
503                                reason: StrVariant::from(reason_full),
504                            },
505                            None,
506                            new_version,
507                        )
508                    }
509                    WorkerError::FatalError(fatal_error, version) => {
510                        info!("Fatal worker error - {fatal_error:?}");
511                        let result = Err(FinishedExecutionError::from(fatal_error));
512                        let child_finished =
513                            parent.map(|(parent_execution_id, parent_join_set)| {
514                                ChildFinishedResponse {
515                                    parent_execution_id,
516                                    parent_join_set,
517                                    result: result.clone(),
518                                }
519                            });
520                        (
521                            ExecutionEventInner::Finished {
522                                result,
523                                http_client_traces: None,
524                            },
525                            child_finished,
526                            version,
527                        )
528                    }
529                };
530                Some(Append {
531                    created_at: result_obtained_at,
532                    primary_event: AppendRequest {
533                        created_at: result_obtained_at,
534                        event: primary_event,
535                    },
536                    execution_id,
537                    version,
538                    child_finished,
539                })
540            }
541        })
542    }
543}
544
545#[expect(clippy::too_many_arguments)]
546fn retry_or_fail(
547    result_obtained_at: DateTime<Utc>,
548    parent: Option<(ExecutionId, JoinSetId)>,
549    can_be_retried: Option<Duration>,
550    reason_full: String,
551    reason_inner: String,
552    err_kind_for_logs: impl Display,
553    detail: Option<String>,
554    version: Version,
555    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
556) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
557    if let Some(duration) = can_be_retried {
558        let expires_at = result_obtained_at + duration;
559        debug!(
560            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
561        );
562        (
563            ExecutionEventInner::TemporarilyFailed {
564                backoff_expires_at: expires_at,
565                reason_full: StrVariant::from(reason_full),
566                reason_inner: StrVariant::from(reason_inner),
567                detail,
568                http_client_traces,
569            },
570            None,
571            version,
572        )
573    } else {
574        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
575        let result = Err(FinishedExecutionError::PermanentFailure {
576            reason_inner,
577            reason_full,
578            kind: PermanentFailureKind::ActivityTrap,
579            detail,
580        });
581        let child_finished =
582            parent.map(
583                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
584                    parent_execution_id,
585                    parent_join_set,
586                    result: result.clone(),
587                },
588            );
589        (
590            ExecutionEventInner::Finished {
591                result,
592                http_client_traces,
593            },
594            child_finished,
595            version,
596        )
597    }
598}
599
600#[derive(Debug, Clone)]
601pub(crate) struct ChildFinishedResponse {
602    pub(crate) parent_execution_id: ExecutionId,
603    pub(crate) parent_join_set: JoinSetId,
604    pub(crate) result: FinishedExecutionResult,
605}
606
607#[derive(Debug, Clone)]
608pub(crate) struct Append {
609    pub(crate) created_at: DateTime<Utc>,
610    pub(crate) primary_event: AppendRequest,
611    pub(crate) execution_id: ExecutionId,
612    pub(crate) version: Version,
613    pub(crate) child_finished: Option<ChildFinishedResponse>,
614}
615
616impl Append {
617    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
618        if let Some(child_finished) = self.child_finished {
619            assert_matches!(
620                &self.primary_event,
621                AppendRequest {
622                    event: ExecutionEventInner::Finished { .. },
623                    ..
624                }
625            );
626            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
627            db_connection
628                .append_batch_respond_to_parent(
629                    derived.clone(),
630                    self.created_at,
631                    vec![self.primary_event],
632                    self.version.clone(),
633                    child_finished.parent_execution_id,
634                    JoinSetResponseEventOuter {
635                        created_at: self.created_at,
636                        event: JoinSetResponseEvent {
637                            join_set_id: child_finished.parent_join_set,
638                            event: JoinSetResponse::ChildExecutionFinished {
639                                child_execution_id: derived,
640                                // Since self.primary_event is a finished event, the version will remain the same.
641                                finished_version: self.version,
642                                result: child_finished.result,
643                            },
644                        },
645                    },
646                )
647                .await?;
648        } else {
649            db_connection
650                .append_batch(
651                    self.created_at,
652                    vec![self.primary_event],
653                    self.execution_id,
654                    self.version,
655                )
656                .await?;
657        }
658        Ok(())
659    }
660}
661
662#[cfg(any(test, feature = "test"))]
663pub mod simple_worker {
664    use crate::worker::{Worker, WorkerContext, WorkerResult};
665    use async_trait::async_trait;
666    use concepts::{
667        FunctionFqn, FunctionMetadata, ParameterTypes,
668        storage::{HistoryEvent, Version},
669    };
670    use indexmap::IndexMap;
671    use std::sync::Arc;
672    use tracing::trace;
673
674    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
675    pub type SimpleWorkerResultMap =
676        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
677
678    #[derive(Clone, Debug)]
679    pub struct SimpleWorker {
680        pub worker_results_rev: SimpleWorkerResultMap,
681        pub ffqn: FunctionFqn,
682        exported: [FunctionMetadata; 1],
683    }
684
685    impl SimpleWorker {
686        #[must_use]
687        pub fn with_single_result(res: WorkerResult) -> Self {
688            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
689                Version::new(2),
690                (vec![], res),
691            )]))))
692        }
693
694        #[must_use]
695        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
696            Self {
697                worker_results_rev: self.worker_results_rev,
698                exported: [FunctionMetadata {
699                    ffqn: ffqn.clone(),
700                    parameter_types: ParameterTypes::default(),
701                    return_type: None,
702                    extension: None,
703                    submittable: true,
704                }],
705                ffqn,
706            }
707        }
708
709        #[must_use]
710        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
711            Self {
712                worker_results_rev,
713                ffqn: FFQN_SOME,
714                exported: [FunctionMetadata {
715                    ffqn: FFQN_SOME,
716                    parameter_types: ParameterTypes::default(),
717                    return_type: None,
718                    extension: None,
719                    submittable: true,
720                }],
721            }
722        }
723    }
724
725    #[async_trait]
726    impl Worker for SimpleWorker {
727        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
728            let (expected_version, (expected_eh, worker_result)) =
729                self.worker_results_rev.lock().unwrap().pop().unwrap();
730            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
731            assert_eq!(expected_version, ctx.version);
732            assert_eq!(expected_eh, ctx.event_history);
733            worker_result
734        }
735
736        fn exported_functions(&self) -> &[FunctionMetadata] {
737            &self.exported
738        }
739    }
740}
741
742#[cfg(test)]
743mod tests {
744    use self::simple_worker::SimpleWorker;
745    use super::*;
746    use crate::worker::FatalError;
747    use crate::{expired_timers_watcher, worker::WorkerResult};
748    use assert_matches::assert_matches;
749    use async_trait::async_trait;
750    use concepts::storage::{CreateRequest, JoinSetRequest};
751    use concepts::storage::{
752        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
753    };
754    use concepts::time::Now;
755    use concepts::{
756        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
757        SupportedFunctionReturnValue, TrapKind,
758    };
759    use db_tests::Database;
760    use indexmap::IndexMap;
761    use simple_worker::FFQN_SOME;
762    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
763    use test_utils::set_up;
764    use test_utils::sim_clock::{ConstClock, SimClock};
765
766    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
767
768    async fn tick_fn<
769        W: Worker + Debug,
770        C: ClockFn + 'static,
771        DB: DbConnection + 'static,
772        P: DbPool<DB> + 'static,
773    >(
774        config: ExecConfig,
775        clock_fn: C,
776        db_pool: P,
777        worker: Arc<W>,
778        executed_at: DateTime<Utc>,
779    ) -> ExecutionProgress {
780        trace!("Ticking with {worker:?}");
781        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
782        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
783        let mut execution_progress = executor.tick(executed_at).await.unwrap();
784        loop {
785            execution_progress
786                .executions
787                .retain(|(_, abort_handle)| !abort_handle.is_finished());
788            if execution_progress.executions.is_empty() {
789                return execution_progress;
790            }
791            tokio::time::sleep(Duration::from_millis(10)).await;
792        }
793    }
794
795    #[tokio::test]
796    async fn execute_simple_lifecycle_tick_based_mem() {
797        let created_at = Now.now();
798        let (_guard, db_pool) = Database::Memory.set_up().await;
799        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
800        db_pool.close().await.unwrap();
801    }
802
803    #[cfg(not(madsim))]
804    #[tokio::test]
805    async fn execute_simple_lifecycle_tick_based_sqlite() {
806        let created_at = Now.now();
807        let (_guard, db_pool) = Database::Sqlite.set_up().await;
808        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
809        db_pool.close().await.unwrap();
810    }
811
812    async fn execute_simple_lifecycle_tick_based<
813        DB: DbConnection + 'static,
814        P: DbPool<DB> + 'static,
815        C: ClockFn + 'static,
816    >(
817        pool: P,
818        clock_fn: C,
819    ) {
820        set_up();
821        let created_at = clock_fn.now();
822        let exec_config = ExecConfig {
823            batch_size: 1,
824            lock_expiry: Duration::from_secs(1),
825            tick_sleep: Duration::from_millis(100),
826            component_id: ComponentId::dummy_activity(),
827            task_limiter: None,
828        };
829
830        let execution_log = create_and_tick(
831            CreateAndTickConfig {
832                execution_id: ExecutionId::generate(),
833                created_at,
834                max_retries: 0,
835                executed_at: created_at,
836                retry_exp_backoff: Duration::ZERO,
837            },
838            clock_fn,
839            pool,
840            exec_config,
841            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
842                SupportedFunctionReturnValue::None,
843                Version::new(2),
844                None,
845            ))),
846            tick_fn,
847        )
848        .await;
849        assert_matches!(
850            execution_log.events.get(2).unwrap(),
851            ExecutionEvent {
852                event: ExecutionEventInner::Finished {
853                    result: Ok(SupportedFunctionReturnValue::None),
854                    http_client_traces: None
855                },
856                created_at: _,
857                backtrace_id: None,
858            }
859        );
860    }
861
862    #[tokio::test]
863    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
864        set_up();
865        let created_at = Now.now();
866        let clock_fn = ConstClock(created_at);
867        let (_guard, db_pool) = Database::Memory.set_up().await;
868        let exec_config = ExecConfig {
869            batch_size: 1,
870            lock_expiry: Duration::from_secs(1),
871            tick_sleep: Duration::ZERO,
872            component_id: ComponentId::dummy_activity(),
873            task_limiter: None,
874        };
875
876        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
877            SupportedFunctionReturnValue::None,
878            Version::new(2),
879            None,
880        )));
881        let exec_task = ExecTask::spawn_new(
882            worker.clone(),
883            exec_config.clone(),
884            clock_fn,
885            db_pool.clone(),
886            ExecutorId::generate(),
887        );
888
889        let execution_log = create_and_tick(
890            CreateAndTickConfig {
891                execution_id: ExecutionId::generate(),
892                created_at,
893                max_retries: 0,
894                executed_at: created_at,
895                retry_exp_backoff: Duration::ZERO,
896            },
897            clock_fn,
898            db_pool.clone(),
899            exec_config,
900            worker,
901            |_, _, _, _, _| async {
902                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
903                ExecutionProgress::default()
904            },
905        )
906        .await;
907        exec_task.close().await;
908        db_pool.close().await.unwrap();
909        assert_matches!(
910            execution_log.events.get(2).unwrap(),
911            ExecutionEvent {
912                event: ExecutionEventInner::Finished {
913                    result: Ok(SupportedFunctionReturnValue::None),
914                    http_client_traces: None
915                },
916                created_at: _,
917                backtrace_id: None,
918            }
919        );
920    }
921
922    struct CreateAndTickConfig {
923        execution_id: ExecutionId,
924        created_at: DateTime<Utc>,
925        max_retries: u32,
926        executed_at: DateTime<Utc>,
927        retry_exp_backoff: Duration,
928    }
929
930    async fn create_and_tick<
931        W: Worker,
932        C: ClockFn,
933        DB: DbConnection,
934        P: DbPool<DB>,
935        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
936        F: Future<Output = ExecutionProgress>,
937    >(
938        config: CreateAndTickConfig,
939        clock_fn: C,
940        db_pool: P,
941        exec_config: ExecConfig,
942        worker: Arc<W>,
943        mut tick: T,
944    ) -> ExecutionLog {
945        // Create an execution
946        let db_connection = db_pool.connection();
947        db_connection
948            .create(CreateRequest {
949                created_at: config.created_at,
950                execution_id: config.execution_id.clone(),
951                ffqn: FFQN_SOME,
952                params: Params::empty(),
953                parent: None,
954                metadata: concepts::ExecutionMetadata::empty(),
955                scheduled_at: config.created_at,
956                retry_exp_backoff: config.retry_exp_backoff,
957                max_retries: config.max_retries,
958                component_id: ComponentId::dummy_activity(),
959                scheduled_by: None,
960            })
961            .await
962            .unwrap();
963        // execute!
964        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
965        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
966        debug!("Execution history after tick: {execution_log:?}");
967        // check that DB contains Created and Locked events.
968        assert_matches!(
969            execution_log.events.first().unwrap(),
970            ExecutionEvent {
971                event: ExecutionEventInner::Created { .. },
972                created_at: actually_created_at,
973                backtrace_id: None,
974            }
975            if config.created_at == *actually_created_at
976        );
977        let locked_at = assert_matches!(
978            execution_log.events.get(1).unwrap(),
979            ExecutionEvent {
980                event: ExecutionEventInner::Locked { .. },
981                created_at: locked_at,
982                backtrace_id: None,
983            } if config.created_at <= *locked_at
984            => *locked_at
985        );
986        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
987            event: _,
988            created_at: executed_at,
989            backtrace_id: None,
990        } if *executed_at >= locked_at);
991        execution_log
992    }
993
994    #[tokio::test]
995    async fn activity_trap_should_trigger_an_execution_retry() {
996        set_up();
997        let sim_clock = SimClock::default();
998        let (_guard, db_pool) = Database::Memory.set_up().await;
999        let exec_config = ExecConfig {
1000            batch_size: 1,
1001            lock_expiry: Duration::from_secs(1),
1002            tick_sleep: Duration::ZERO,
1003            component_id: ComponentId::dummy_activity(),
1004            task_limiter: None,
1005        };
1006        let expected_reason = "error reason";
1007        let expected_detail = "error detail";
1008        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1009            WorkerError::ActivityTrap {
1010                reason: expected_reason.to_string(),
1011                trap_kind: concepts::TrapKind::Trap,
1012                detail: expected_detail.to_string(),
1013                version: Version::new(2),
1014                http_client_traces: None,
1015            },
1016        )));
1017        let retry_exp_backoff = Duration::from_millis(100);
1018        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1019        let execution_log = create_and_tick(
1020            CreateAndTickConfig {
1021                execution_id: ExecutionId::generate(),
1022                created_at: sim_clock.now(),
1023                max_retries: 1,
1024                executed_at: sim_clock.now(),
1025                retry_exp_backoff,
1026            },
1027            sim_clock.clone(),
1028            db_pool.clone(),
1029            exec_config.clone(),
1030            worker,
1031            tick_fn,
1032        )
1033        .await;
1034        assert_eq!(3, execution_log.events.len());
1035        {
1036            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1037                &execution_log.events.get(2).unwrap(),
1038                ExecutionEvent {
1039                    event: ExecutionEventInner::TemporarilyFailed {
1040                        reason_inner,
1041                        reason_full,
1042                        detail,
1043                        backoff_expires_at,
1044                        http_client_traces: None,
1045                    },
1046                    created_at: at,
1047                    backtrace_id: None,
1048                }
1049                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1050            );
1051            assert_eq!(expected_reason, reason_inner.deref());
1052            assert_eq!(
1053                format!("activity trap: {expected_reason}"),
1054                reason_full.deref()
1055            );
1056            assert_eq!(Some(expected_detail), detail.as_deref());
1057            assert_eq!(at, sim_clock.now());
1058            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1059        }
1060        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1061            std::sync::Mutex::new(IndexMap::from([(
1062                Version::new(4),
1063                (
1064                    vec![],
1065                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1066                ),
1067            )])),
1068        )));
1069        // noop until `retry_exp_backoff` expires
1070        assert!(
1071            tick_fn(
1072                exec_config.clone(),
1073                sim_clock.clone(),
1074                db_pool.clone(),
1075                worker.clone(),
1076                sim_clock.now(),
1077            )
1078            .await
1079            .executions
1080            .is_empty()
1081        );
1082        // tick again to finish the execution
1083        sim_clock.move_time_forward(retry_exp_backoff).await;
1084        tick_fn(
1085            exec_config,
1086            sim_clock.clone(),
1087            db_pool.clone(),
1088            worker,
1089            sim_clock.now(),
1090        )
1091        .await;
1092        let execution_log = {
1093            let db_connection = db_pool.connection();
1094            db_connection
1095                .get(&execution_log.execution_id)
1096                .await
1097                .unwrap()
1098        };
1099        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1100        assert_matches!(
1101            execution_log.events.get(3).unwrap(),
1102            ExecutionEvent {
1103                event: ExecutionEventInner::Locked { .. },
1104                created_at: at,
1105                backtrace_id: None,
1106            } if *at == sim_clock.now()
1107        );
1108        assert_matches!(
1109            execution_log.events.get(4).unwrap(),
1110            ExecutionEvent {
1111                event: ExecutionEventInner::Finished {
1112                    result: Ok(SupportedFunctionReturnValue::None),
1113                    http_client_traces: None
1114                },
1115                created_at: finished_at,
1116                backtrace_id: None,
1117            } if *finished_at == sim_clock.now()
1118        );
1119        db_pool.close().await.unwrap();
1120    }
1121
1122    #[tokio::test]
1123    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1124        set_up();
1125        let created_at = Now.now();
1126        let clock_fn = ConstClock(created_at);
1127        let (_guard, db_pool) = Database::Memory.set_up().await;
1128        let exec_config = ExecConfig {
1129            batch_size: 1,
1130            lock_expiry: Duration::from_secs(1),
1131            tick_sleep: Duration::ZERO,
1132            component_id: ComponentId::dummy_activity(),
1133            task_limiter: None,
1134        };
1135
1136        let expected_reason = "error reason";
1137        let expected_detail = "error detail";
1138        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1139            WorkerError::ActivityTrap {
1140                reason: expected_reason.to_string(),
1141                trap_kind: concepts::TrapKind::Trap,
1142                detail: expected_detail.to_string(),
1143                version: Version::new(2),
1144                http_client_traces: None,
1145            },
1146        )));
1147        let execution_log = create_and_tick(
1148            CreateAndTickConfig {
1149                execution_id: ExecutionId::generate(),
1150                created_at,
1151                max_retries: 0,
1152                executed_at: created_at,
1153                retry_exp_backoff: Duration::ZERO,
1154            },
1155            clock_fn,
1156            db_pool.clone(),
1157            exec_config.clone(),
1158            worker,
1159            tick_fn,
1160        )
1161        .await;
1162        assert_eq!(3, execution_log.events.len());
1163        let (reason, kind, detail) = assert_matches!(
1164            &execution_log.events.get(2).unwrap(),
1165            ExecutionEvent {
1166                event: ExecutionEventInner::Finished{
1167                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1168                    http_client_traces: None
1169                },
1170                created_at: at,
1171                backtrace_id: None,
1172            } if *at == created_at
1173            => (reason_inner, kind, detail)
1174        );
1175        assert_eq!(expected_reason, *reason);
1176        assert_eq!(Some(expected_detail), detail.as_deref());
1177        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1178
1179        db_pool.close().await.unwrap();
1180    }
1181
1182    #[tokio::test]
1183    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1184        let worker_error = WorkerError::ActivityTrap {
1185            reason: "error reason".to_string(),
1186            trap_kind: TrapKind::Trap,
1187            detail: "detail".to_string(),
1188            version: Version::new(2),
1189            http_client_traces: None,
1190        };
1191        let expected_child_err = FinishedExecutionError::PermanentFailure {
1192            reason_full: "activity trap: error reason".to_string(),
1193            reason_inner: "error reason".to_string(),
1194            kind: PermanentFailureKind::ActivityTrap,
1195            detail: Some("detail".to_string()),
1196        };
1197        child_execution_permanently_failed_should_notify_parent(
1198            WorkerResult::Err(worker_error),
1199            expected_child_err,
1200        )
1201        .await;
1202    }
1203
1204    // TODO: Add test for WorkerError::TemporaryTimeout
1205
1206    #[tokio::test]
1207    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1208        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1209        child_execution_permanently_failed_should_notify_parent(
1210            WorkerResult::DbUpdatedByWorkerOrWatcher,
1211            expected_child_err,
1212        )
1213        .await;
1214    }
1215
1216    #[tokio::test]
1217    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1218        let parent_id = ExecutionId::from_parts(1, 1);
1219        let join_set_id_outer =
1220            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1221        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1222        let join_set_id_inner =
1223            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1224        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1225        let worker_error = WorkerError::FatalError(
1226            FatalError::UnhandledChildExecutionError {
1227                child_execution_id: child_execution_id.clone(),
1228                root_cause_id: root_cause_id.clone(),
1229            },
1230            Version::new(2),
1231        );
1232        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1233            child_execution_id,
1234            root_cause_id,
1235        };
1236        child_execution_permanently_failed_should_notify_parent(
1237            WorkerResult::Err(worker_error),
1238            expected_child_err,
1239        )
1240        .await;
1241    }
1242
1243    async fn child_execution_permanently_failed_should_notify_parent(
1244        worker_result: WorkerResult,
1245        expected_child_err: FinishedExecutionError,
1246    ) {
1247        use concepts::storage::JoinSetResponseEventOuter;
1248        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1249
1250        set_up();
1251        let sim_clock = SimClock::default();
1252        let (_guard, db_pool) = Database::Memory.set_up().await;
1253
1254        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1255            WorkerResult::DbUpdatedByWorkerOrWatcher,
1256        ));
1257        let parent_execution_id = ExecutionId::generate();
1258        db_pool
1259            .connection()
1260            .create(CreateRequest {
1261                created_at: sim_clock.now(),
1262                execution_id: parent_execution_id.clone(),
1263                ffqn: FFQN_SOME,
1264                params: Params::empty(),
1265                parent: None,
1266                metadata: concepts::ExecutionMetadata::empty(),
1267                scheduled_at: sim_clock.now(),
1268                retry_exp_backoff: Duration::ZERO,
1269                max_retries: 0,
1270                component_id: ComponentId::dummy_activity(),
1271                scheduled_by: None,
1272            })
1273            .await
1274            .unwrap();
1275        tick_fn(
1276            ExecConfig {
1277                batch_size: 1,
1278                lock_expiry: LOCK_EXPIRY,
1279                tick_sleep: Duration::ZERO,
1280                component_id: ComponentId::dummy_activity(),
1281                task_limiter: None,
1282            },
1283            sim_clock.clone(),
1284            db_pool.clone(),
1285            parent_worker,
1286            sim_clock.now(),
1287        )
1288        .await;
1289
1290        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1291        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1292        // executor does not append anything, this should have been written by the worker:
1293        {
1294            let child = CreateRequest {
1295                created_at: sim_clock.now(),
1296                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1297                ffqn: FFQN_CHILD,
1298                params: Params::empty(),
1299                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1300                metadata: concepts::ExecutionMetadata::empty(),
1301                scheduled_at: sim_clock.now(),
1302                retry_exp_backoff: Duration::ZERO,
1303                max_retries: 0,
1304                component_id: ComponentId::dummy_activity(),
1305                scheduled_by: None,
1306            };
1307            let current_time = sim_clock.now();
1308            let join_set = AppendRequest {
1309                created_at: current_time,
1310                event: ExecutionEventInner::HistoryEvent {
1311                    event: HistoryEvent::JoinSetCreate {
1312                        join_set_id: join_set_id.clone(),
1313                        closing_strategy: ClosingStrategy::Complete,
1314                    },
1315                },
1316            };
1317            let child_exec_req = AppendRequest {
1318                created_at: current_time,
1319                event: ExecutionEventInner::HistoryEvent {
1320                    event: HistoryEvent::JoinSetRequest {
1321                        join_set_id: join_set_id.clone(),
1322                        request: JoinSetRequest::ChildExecutionRequest {
1323                            child_execution_id: child_execution_id.clone(),
1324                        },
1325                    },
1326                },
1327            };
1328            let join_next = AppendRequest {
1329                created_at: current_time,
1330                event: ExecutionEventInner::HistoryEvent {
1331                    event: HistoryEvent::JoinNext {
1332                        join_set_id: join_set_id.clone(),
1333                        run_expires_at: sim_clock.now(),
1334                        closing: false,
1335                    },
1336                },
1337            };
1338            db_pool
1339                .connection()
1340                .append_batch_create_new_execution(
1341                    current_time,
1342                    vec![join_set, child_exec_req, join_next],
1343                    parent_execution_id.clone(),
1344                    Version::new(2),
1345                    vec![child],
1346                )
1347                .await
1348                .unwrap();
1349        }
1350
1351        let child_worker =
1352            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1353
1354        // execute the child
1355        tick_fn(
1356            ExecConfig {
1357                batch_size: 1,
1358                lock_expiry: LOCK_EXPIRY,
1359                tick_sleep: Duration::ZERO,
1360                component_id: ComponentId::dummy_activity(),
1361                task_limiter: None,
1362            },
1363            sim_clock.clone(),
1364            db_pool.clone(),
1365            child_worker,
1366            sim_clock.now(),
1367        )
1368        .await;
1369        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1370            // In case of timeout, let the timers watcher handle it
1371            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1372            expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1373                .await
1374                .unwrap();
1375        }
1376        let child_log = db_pool
1377            .connection()
1378            .get(&ExecutionId::Derived(child_execution_id.clone()))
1379            .await
1380            .unwrap();
1381        assert!(child_log.pending_state.is_finished());
1382        assert_eq!(
1383            Version(2),
1384            child_log.next_version,
1385            "created = 0, locked = 1, with_single_result = 2"
1386        );
1387        assert_eq!(
1388            ExecutionEventInner::Finished {
1389                result: Err(expected_child_err),
1390                http_client_traces: None
1391            },
1392            child_log.last_event().event
1393        );
1394        let parent_log = db_pool
1395            .connection()
1396            .get(&parent_execution_id)
1397            .await
1398            .unwrap();
1399        assert_matches!(
1400            parent_log.pending_state,
1401            PendingState::PendingAt {
1402                scheduled_at
1403            } if scheduled_at == sim_clock.now(),
1404            "parent should be back to pending"
1405        );
1406        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1407            parent_log.responses.last(),
1408            Some(JoinSetResponseEventOuter{
1409                created_at: at,
1410                event: JoinSetResponseEvent{
1411                    join_set_id: found_join_set_id,
1412                    event: JoinSetResponse::ChildExecutionFinished {
1413                        child_execution_id: found_child_execution_id,
1414                        finished_version,
1415                        result: found_result,
1416                    }
1417                }
1418            })
1419             if *at == sim_clock.now()
1420            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1421        );
1422        assert_eq!(join_set_id, *found_join_set_id);
1423        assert_eq!(child_execution_id, *found_child_execution_id);
1424        assert_eq!(child_log.next_version, *child_finished_version);
1425        assert!(found_result.is_err());
1426
1427        db_pool.close().await.unwrap();
1428    }
1429
1430    #[derive(Clone, Debug)]
1431    struct SleepyWorker {
1432        duration: Duration,
1433        result: SupportedFunctionReturnValue,
1434        exported: [FunctionMetadata; 1],
1435    }
1436
1437    #[async_trait]
1438    impl Worker for SleepyWorker {
1439        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1440            tokio::time::sleep(self.duration).await;
1441            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1442        }
1443
1444        fn exported_functions(&self) -> &[FunctionMetadata] {
1445            &self.exported
1446        }
1447    }
1448
1449    #[tokio::test]
1450    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1451        set_up();
1452        let sim_clock = SimClock::default();
1453        let (_guard, db_pool) = Database::Memory.set_up().await;
1454        let lock_expiry = Duration::from_millis(100);
1455        let exec_config = ExecConfig {
1456            batch_size: 1,
1457            lock_expiry,
1458            tick_sleep: Duration::ZERO,
1459            component_id: ComponentId::dummy_activity(),
1460            task_limiter: None,
1461        };
1462
1463        let worker = Arc::new(SleepyWorker {
1464            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1465            result: SupportedFunctionReturnValue::None,
1466            exported: [FunctionMetadata {
1467                ffqn: FFQN_SOME,
1468                parameter_types: ParameterTypes::default(),
1469                return_type: None,
1470                extension: None,
1471                submittable: true,
1472            }],
1473        });
1474        // Create an execution
1475        let execution_id = ExecutionId::generate();
1476        let timeout_duration = Duration::from_millis(300);
1477        let db_connection = db_pool.connection();
1478        db_connection
1479            .create(CreateRequest {
1480                created_at: sim_clock.now(),
1481                execution_id: execution_id.clone(),
1482                ffqn: FFQN_SOME,
1483                params: Params::empty(),
1484                parent: None,
1485                metadata: concepts::ExecutionMetadata::empty(),
1486                scheduled_at: sim_clock.now(),
1487                retry_exp_backoff: timeout_duration,
1488                max_retries: 1,
1489                component_id: ComponentId::dummy_activity(),
1490                scheduled_by: None,
1491            })
1492            .await
1493            .unwrap();
1494
1495        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1496        let executor = ExecTask::new(
1497            worker,
1498            exec_config,
1499            sim_clock.clone(),
1500            db_pool.clone(),
1501            ffqns,
1502        );
1503        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1504        assert_eq!(1, first_execution_progress.executions.len());
1505        // Started hanging, wait for lock expiry.
1506        sim_clock.move_time_forward(lock_expiry).await;
1507        // cleanup should be called
1508        let now_after_first_lock_expiry = sim_clock.now();
1509        {
1510            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1511            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1512            assert!(cleanup_progress.executions.is_empty());
1513        }
1514        {
1515            let expired_locks =
1516                expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1517                    .await
1518                    .unwrap()
1519                    .expired_locks;
1520            assert_eq!(1, expired_locks);
1521        }
1522        assert!(
1523            !first_execution_progress
1524                .executions
1525                .pop()
1526                .unwrap()
1527                .1
1528                .is_finished()
1529        );
1530
1531        let execution_log = db_connection.get(&execution_id).await.unwrap();
1532        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1533        assert_matches!(
1534            &execution_log.events.get(2).unwrap(),
1535            ExecutionEvent {
1536                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1537                created_at: at,
1538                backtrace_id: None,
1539            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1540        );
1541        assert_eq!(
1542            PendingState::PendingAt {
1543                scheduled_at: expected_first_timeout_expiry
1544            },
1545            execution_log.pending_state
1546        );
1547        sim_clock.move_time_forward(timeout_duration).await;
1548        let now_after_first_timeout = sim_clock.now();
1549        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1550
1551        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1552        assert_eq!(1, second_execution_progress.executions.len());
1553
1554        // Started hanging, wait for lock expiry.
1555        sim_clock.move_time_forward(lock_expiry).await;
1556        // cleanup should be called
1557        let now_after_second_lock_expiry = sim_clock.now();
1558        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1559        {
1560            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1561            assert!(cleanup_progress.executions.is_empty());
1562        }
1563        {
1564            let expired_locks =
1565                expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1566                    .await
1567                    .unwrap()
1568                    .expired_locks;
1569            assert_eq!(1, expired_locks);
1570        }
1571        assert!(
1572            !second_execution_progress
1573                .executions
1574                .pop()
1575                .unwrap()
1576                .1
1577                .is_finished()
1578        );
1579
1580        drop(db_connection);
1581        drop(executor);
1582        db_pool.close().await.unwrap();
1583    }
1584}