obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7    LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13    FinishedExecutionError,
14    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30    pub lock_expiry: Duration,
31    pub tick_sleep: Duration,
32    pub batch_size: u32,
33    pub component_id: ComponentId,
34    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35    pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39    worker: Arc<dyn Worker>,
40    config: ExecConfig,
41    clock_fn: C, // Used for obtaining current time when the execution finishes.
42    db_pool: Arc<dyn DbPool>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!("Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: Arc<dyn DbPool>,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            clock_fn,
123            db_pool,
124            ffqns,
125        }
126    }
127
128    pub fn spawn_new(
129        worker: Arc<dyn Worker>,
130        config: ExecConfig,
131        clock_fn: C,
132        db_pool: Arc<dyn DbPool>,
133    ) -> ExecutorTaskHandle {
134        let is_closing = Arc::new(AtomicBool::default());
135        let is_closing_inner = is_closing.clone();
136        let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137        let component_id = config.component_id.clone();
138        let executor_id = config.executor_id;
139        let abort_handle = tokio::spawn(async move {
140            debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141            let task = Self {
142                worker,
143                config,
144                db_pool,
145                ffqns: ffqns.clone(),
146                clock_fn: clock_fn.clone(),
147            };
148            loop {
149                let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150                let executed_at = clock_fn.now();
151                task.db_pool
152                    .connection()
153                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154                    .await;
155                if is_closing_inner.load(Ordering::Relaxed) {
156                    return;
157                }
158            }
159        })
160        .abort_handle();
161        ExecutorTaskHandle {
162            is_closing,
163            abort_handle,
164            component_id,
165            executor_id,
166        }
167    }
168
169    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170        if let Some(task_limiter) = &self.config.task_limiter {
171            let mut locks = Vec::new();
172            for _ in 0..self.config.batch_size {
173                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174                    locks.push(Some(permit));
175                } else {
176                    break;
177                }
178            }
179            locks
180        } else {
181            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182            for _ in 0..self.config.batch_size {
183                vec.push(None);
184            }
185            vec
186        }
187    }
188
189    #[cfg(feature = "test")]
190    pub async fn tick_test(
191        &self,
192        executed_at: DateTime<Utc>,
193        run_id: RunId,
194    ) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at, run_id).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199    async fn tick(
200        &self,
201        executed_at: DateTime<Utc>,
202        run_id: RunId,
203    ) -> Result<ExecutionProgress, ()> {
204        let locked_executions = {
205            let mut permits = self.acquire_task_permits();
206            if permits.is_empty() {
207                return Ok(ExecutionProgress::default());
208            }
209            let db_connection = self.db_pool.connection();
210            let lock_expires_at = executed_at + self.config.lock_expiry;
211            let locked_executions = db_connection
212                .lock_pending(
213                    permits.len(), // batch size
214                    executed_at,   // fetch expiring before now
215                    self.ffqns.clone(),
216                    executed_at, // created at
217                    self.config.component_id.clone(),
218                    self.config.executor_id,
219                    lock_expires_at,
220                    run_id,
221                )
222                .await
223                .map_err(|err| {
224                    warn!("lock_pending error {err:?}");
225                })?;
226            // Drop permits if too many were allocated.
227            while permits.len() > locked_executions.len() {
228                permits.pop();
229            }
230            assert_eq!(permits.len(), locked_executions.len());
231            locked_executions.into_iter().zip(permits)
232        };
233        let execution_deadline = executed_at + self.config.lock_expiry;
234
235        let mut executions = Vec::with_capacity(locked_executions.len());
236        for (locked_execution, permit) in locked_executions {
237            let execution_id = locked_execution.execution_id.clone();
238            let join_handle = {
239                let worker = self.worker.clone();
240                let db_pool = self.db_pool.clone();
241                let clock_fn = self.clock_fn.clone();
242                let run_id = locked_execution.run_id;
243                let worker_span = info_span!(parent: None, "worker",
244                    "otel.name" = format!("worker {}", locked_execution.ffqn),
245                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246                locked_execution.metadata.enrich(&worker_span);
247                tokio::spawn({
248                    let worker_span2 = worker_span.clone();
249                    async move {
250                        let _permit = permit;
251                        let res = Self::run_worker(
252                            worker,
253                            db_pool.as_ref(),
254                            execution_deadline,
255                            clock_fn,
256                            locked_execution,
257                            worker_span2,
258                        )
259                        .await;
260                        if let Err(db_error) = res {
261                            error!("Got DbError `{db_error:?}`, expecting watcher to mark execution as timed out");
262                        }
263                    }
264                    .instrument(worker_span)
265                })
266            };
267            executions.push((execution_id, join_handle));
268        }
269        Ok(ExecutionProgress { executions })
270    }
271
272    async fn run_worker(
273        worker: Arc<dyn Worker>,
274        db_pool: &dyn DbPool,
275        execution_deadline: DateTime<Utc>,
276        clock_fn: C,
277        locked_execution: LockedExecution,
278        worker_span: Span,
279    ) -> Result<(), DbError> {
280        debug!("Worker::run starting");
281        trace!(
282            version = %locked_execution.version,
283            params = ?locked_execution.params,
284            event_history = ?locked_execution.event_history,
285            "Worker::run starting"
286        );
287        let can_be_retried = ExecutionLog::can_be_retried_after(
288            locked_execution.temporary_event_count + 1,
289            locked_execution.max_retries,
290            locked_execution.retry_exp_backoff,
291        );
292        let unlock_expiry_on_limit_reached =
293            ExecutionLog::compute_retry_duration_when_retrying_forever(
294                locked_execution.temporary_event_count + 1,
295                locked_execution.retry_exp_backoff,
296            );
297        let ctx = WorkerContext {
298            execution_id: locked_execution.execution_id.clone(),
299            metadata: locked_execution.metadata,
300            ffqn: locked_execution.ffqn,
301            params: locked_execution.params,
302            event_history: locked_execution.event_history,
303            responses: locked_execution
304                .responses
305                .into_iter()
306                .map(|outer| outer.event)
307                .collect(),
308            version: locked_execution.version,
309            execution_deadline,
310            can_be_retried: can_be_retried.is_some(),
311            run_id: locked_execution.run_id,
312            worker_span,
313        };
314        let worker_result = worker.run(ctx).await;
315        trace!(?worker_result, "Worker::run finished");
316        let result_obtained_at = clock_fn.now();
317        match Self::worker_result_to_execution_event(
318            locked_execution.execution_id,
319            worker_result,
320            result_obtained_at,
321            locked_execution.parent,
322            can_be_retried,
323            unlock_expiry_on_limit_reached,
324        )? {
325            Some(append) => {
326                let db_connection = db_pool.connection();
327                trace!("Appending {append:?}");
328                append.append(db_connection.as_ref()).await
329            }
330            None => Ok(()),
331        }
332    }
333
334    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
335    /// Map the `WorkerError` to an temporary or a permanent failure.
336    fn worker_result_to_execution_event(
337        execution_id: ExecutionId,
338        worker_result: WorkerResult,
339        result_obtained_at: DateTime<Utc>,
340        parent: Option<(ExecutionId, JoinSetId)>,
341        can_be_retried: Option<Duration>,
342        unlock_expiry_on_limit_reached: Duration,
343    ) -> Result<Option<Append>, DbError> {
344        Ok(match worker_result {
345            WorkerResult::Ok(result, new_version, http_client_traces) => {
346                info!(
347                    "Execution finished: {}",
348                    result.as_pending_state_finished_result()
349                );
350                let child_finished =
351                    parent.map(
352                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
353                            parent_execution_id,
354                            parent_join_set,
355                            result: Ok(result.clone()),
356                        },
357                    );
358                let primary_event = AppendRequest {
359                    created_at: result_obtained_at,
360                    event: ExecutionEventInner::Finished {
361                        result: Ok(result),
362                        http_client_traces,
363                    },
364                };
365
366                Some(Append {
367                    created_at: result_obtained_at,
368                    primary_event,
369                    execution_id,
370                    version: new_version,
371                    child_finished,
372                })
373            }
374            WorkerResult::DbUpdatedByWorkerOrWatcher => None,
375            WorkerResult::Err(err) => {
376                let reason_full = err.to_string(); // WorkerError.display() usually contains a variant specific prefix + inner `reason`.
377                let (primary_event, child_finished, version) = match err {
378                    WorkerError::TemporaryTimeout {
379                        http_client_traces,
380                        version,
381                    } => {
382                        if let Some(duration) = can_be_retried {
383                            let backoff_expires_at = result_obtained_at + duration;
384                            info!(
385                                "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
386                            );
387                            (
388                                ExecutionEventInner::TemporarilyTimedOut {
389                                    backoff_expires_at,
390                                    http_client_traces,
391                                },
392                                None,
393                                version,
394                            )
395                        } else {
396                            info!("Permanent timeout");
397                            let result = Err(FinishedExecutionError::PermanentTimeout);
398                            let child_finished =
399                                parent.map(|(parent_execution_id, parent_join_set)| {
400                                    ChildFinishedResponse {
401                                        parent_execution_id,
402                                        parent_join_set,
403                                        result: result.clone(),
404                                    }
405                                });
406                            (
407                                ExecutionEventInner::Finished {
408                                    result,
409                                    http_client_traces,
410                                },
411                                child_finished,
412                                version,
413                            )
414                        }
415                    }
416                    WorkerError::DbError(db_error) => {
417                        return Err(db_error);
418                    }
419                    WorkerError::ActivityTrap {
420                        reason: reason_inner,
421                        trap_kind,
422                        detail,
423                        version,
424                        http_client_traces,
425                    } => retry_or_fail(
426                        result_obtained_at,
427                        parent,
428                        can_be_retried,
429                        reason_full,
430                        reason_inner,
431                        trap_kind, // short reason like `trap` for logs
432                        detail,
433                        version,
434                        http_client_traces,
435                    ),
436                    WorkerError::ActivityPreopenedDirError {
437                        reason_kind,
438                        reason_inner,
439                        version,
440                    } => retry_or_fail(
441                        result_obtained_at,
442                        parent,
443                        can_be_retried,
444                        reason_full,
445                        reason_inner,
446                        reason_kind, // short reason for logs
447                        None,        // detail
448                        version,
449                        None, // http_client_traces
450                    ),
451                    WorkerError::ActivityReturnedError {
452                        detail,
453                        version,
454                        http_client_traces,
455                    } => {
456                        let duration = can_be_retried.expect(
457                            "ActivityReturnedError must not be returned when retries are exhausted",
458                        );
459                        let expires_at = result_obtained_at + duration;
460                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
461                        (
462                            ExecutionEventInner::TemporarilyFailed {
463                                backoff_expires_at: expires_at,
464                                reason_full: StrVariant::Static("activity returned error"), // is same as the variant's display message.
465                                reason_inner: StrVariant::Static("activity returned error"),
466                                detail, // contains the backtrace
467                                http_client_traces,
468                            },
469                            None,
470                            version,
471                        )
472                    }
473                    WorkerError::LimitReached {
474                        reason: inner_reason,
475                        version: new_version,
476                    } => {
477                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
478                        warn!(
479                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
480                        );
481                        (
482                            ExecutionEventInner::Unlocked {
483                                backoff_expires_at: expires_at,
484                                reason: StrVariant::from(reason_full),
485                            },
486                            None,
487                            new_version,
488                        )
489                    }
490                    WorkerError::FatalError(fatal_error, version) => {
491                        info!("Fatal worker error - {fatal_error:?}");
492                        let result = Err(FinishedExecutionError::from(fatal_error));
493                        let child_finished =
494                            parent.map(|(parent_execution_id, parent_join_set)| {
495                                ChildFinishedResponse {
496                                    parent_execution_id,
497                                    parent_join_set,
498                                    result: result.clone(),
499                                }
500                            });
501                        (
502                            ExecutionEventInner::Finished {
503                                result,
504                                http_client_traces: None,
505                            },
506                            child_finished,
507                            version,
508                        )
509                    }
510                };
511                Some(Append {
512                    created_at: result_obtained_at,
513                    primary_event: AppendRequest {
514                        created_at: result_obtained_at,
515                        event: primary_event,
516                    },
517                    execution_id,
518                    version,
519                    child_finished,
520                })
521            }
522        })
523    }
524}
525
526#[expect(clippy::too_many_arguments)]
527fn retry_or_fail(
528    result_obtained_at: DateTime<Utc>,
529    parent: Option<(ExecutionId, JoinSetId)>,
530    can_be_retried: Option<Duration>,
531    reason_full: String,
532    reason_inner: String,
533    err_kind_for_logs: impl Display,
534    detail: Option<String>,
535    version: Version,
536    http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
537) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
538    if let Some(duration) = can_be_retried {
539        let expires_at = result_obtained_at + duration;
540        debug!(
541            "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
542        );
543        (
544            ExecutionEventInner::TemporarilyFailed {
545                backoff_expires_at: expires_at,
546                reason_full: StrVariant::from(reason_full),
547                reason_inner: StrVariant::from(reason_inner),
548                detail,
549                http_client_traces,
550            },
551            None,
552            version,
553        )
554    } else {
555        info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
556        let result = Err(FinishedExecutionError::PermanentFailure {
557            reason_inner,
558            reason_full,
559            kind: PermanentFailureKind::ActivityTrap,
560            detail,
561        });
562        let child_finished =
563            parent.map(
564                |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
565                    parent_execution_id,
566                    parent_join_set,
567                    result: result.clone(),
568                },
569            );
570        (
571            ExecutionEventInner::Finished {
572                result,
573                http_client_traces,
574            },
575            child_finished,
576            version,
577        )
578    }
579}
580
581#[derive(Debug, Clone)]
582pub(crate) struct ChildFinishedResponse {
583    pub(crate) parent_execution_id: ExecutionId,
584    pub(crate) parent_join_set: JoinSetId,
585    pub(crate) result: FinishedExecutionResult,
586}
587
588#[derive(Debug, Clone)]
589pub(crate) struct Append {
590    pub(crate) created_at: DateTime<Utc>,
591    pub(crate) primary_event: AppendRequest,
592    pub(crate) execution_id: ExecutionId,
593    pub(crate) version: Version,
594    pub(crate) child_finished: Option<ChildFinishedResponse>,
595}
596
597impl Append {
598    pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
599        if let Some(child_finished) = self.child_finished {
600            assert_matches!(
601                &self.primary_event,
602                AppendRequest {
603                    event: ExecutionEventInner::Finished { .. },
604                    ..
605                }
606            );
607            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
608            db_connection
609                .append_batch_respond_to_parent(
610                    derived.clone(),
611                    self.created_at,
612                    vec![self.primary_event],
613                    self.version.clone(),
614                    child_finished.parent_execution_id,
615                    JoinSetResponseEventOuter {
616                        created_at: self.created_at,
617                        event: JoinSetResponseEvent {
618                            join_set_id: child_finished.parent_join_set,
619                            event: JoinSetResponse::ChildExecutionFinished {
620                                child_execution_id: derived,
621                                // Since self.primary_event is a finished event, the version will remain the same.
622                                finished_version: self.version,
623                                result: child_finished.result,
624                            },
625                        },
626                    },
627                )
628                .await?;
629        } else {
630            db_connection
631                .append_batch(
632                    self.created_at,
633                    vec![self.primary_event],
634                    self.execution_id,
635                    self.version,
636                )
637                .await?;
638        }
639        Ok(())
640    }
641}
642
643#[cfg(any(test, feature = "test"))]
644pub mod simple_worker {
645    use crate::worker::{Worker, WorkerContext, WorkerResult};
646    use async_trait::async_trait;
647    use concepts::{
648        FunctionFqn, FunctionMetadata, ParameterTypes,
649        storage::{HistoryEvent, Version},
650    };
651    use indexmap::IndexMap;
652    use std::sync::Arc;
653    use tracing::trace;
654
655    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
656    pub type SimpleWorkerResultMap =
657        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
658
659    #[derive(Clone, Debug)]
660    pub struct SimpleWorker {
661        pub worker_results_rev: SimpleWorkerResultMap,
662        pub ffqn: FunctionFqn,
663        exported: [FunctionMetadata; 1],
664    }
665
666    impl SimpleWorker {
667        #[must_use]
668        pub fn with_single_result(res: WorkerResult) -> Self {
669            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
670                Version::new(2),
671                (vec![], res),
672            )]))))
673        }
674
675        #[must_use]
676        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
677            Self {
678                worker_results_rev: self.worker_results_rev,
679                exported: [FunctionMetadata {
680                    ffqn: ffqn.clone(),
681                    parameter_types: ParameterTypes::default(),
682                    return_type: None,
683                    extension: None,
684                    submittable: true,
685                }],
686                ffqn,
687            }
688        }
689
690        #[must_use]
691        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
692            Self {
693                worker_results_rev,
694                ffqn: FFQN_SOME,
695                exported: [FunctionMetadata {
696                    ffqn: FFQN_SOME,
697                    parameter_types: ParameterTypes::default(),
698                    return_type: None,
699                    extension: None,
700                    submittable: true,
701                }],
702            }
703        }
704    }
705
706    #[async_trait]
707    impl Worker for SimpleWorker {
708        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
709            let (expected_version, (expected_eh, worker_result)) =
710                self.worker_results_rev.lock().unwrap().pop().unwrap();
711            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
712            assert_eq!(expected_version, ctx.version);
713            assert_eq!(expected_eh, ctx.event_history);
714            worker_result
715        }
716
717        fn exported_functions(&self) -> &[FunctionMetadata] {
718            &self.exported
719        }
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use self::simple_worker::SimpleWorker;
726    use super::*;
727    use crate::worker::FatalError;
728    use crate::{expired_timers_watcher, worker::WorkerResult};
729    use assert_matches::assert_matches;
730    use async_trait::async_trait;
731    use concepts::storage::{CreateRequest, JoinSetRequest};
732    use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
733    use concepts::time::Now;
734    use concepts::{
735        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
736        SupportedFunctionReturnValue, TrapKind,
737    };
738    use db_tests::Database;
739    use indexmap::IndexMap;
740    use simple_worker::FFQN_SOME;
741    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
742    use test_utils::set_up;
743    use test_utils::sim_clock::{ConstClock, SimClock};
744
745    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
746
747    async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
748        config: ExecConfig,
749        clock_fn: C,
750        db_pool: Arc<dyn DbPool>,
751        worker: Arc<W>,
752        executed_at: DateTime<Utc>,
753    ) -> ExecutionProgress {
754        trace!("Ticking with {worker:?}");
755        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
756        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
757        let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
758        loop {
759            execution_progress
760                .executions
761                .retain(|(_, abort_handle)| !abort_handle.is_finished());
762            if execution_progress.executions.is_empty() {
763                return execution_progress;
764            }
765            tokio::time::sleep(Duration::from_millis(10)).await;
766        }
767    }
768
769    #[tokio::test]
770    async fn execute_simple_lifecycle_tick_based_mem() {
771        let created_at = Now.now();
772        let (_guard, db_pool) = Database::Memory.set_up().await;
773        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
774        db_pool.close().await.unwrap();
775    }
776
777    #[tokio::test]
778    async fn execute_simple_lifecycle_tick_based_sqlite() {
779        let created_at = Now.now();
780        let (_guard, db_pool) = Database::Sqlite.set_up().await;
781        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
782        db_pool.close().await.unwrap();
783    }
784
785    async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
786        pool: Arc<dyn DbPool>,
787        clock_fn: C,
788    ) {
789        set_up();
790        let created_at = clock_fn.now();
791        let exec_config = ExecConfig {
792            batch_size: 1,
793            lock_expiry: Duration::from_secs(1),
794            tick_sleep: Duration::from_millis(100),
795            component_id: ComponentId::dummy_activity(),
796            task_limiter: None,
797            executor_id: ExecutorId::generate(),
798        };
799
800        let execution_log = create_and_tick(
801            CreateAndTickConfig {
802                execution_id: ExecutionId::generate(),
803                created_at,
804                max_retries: 0,
805                executed_at: created_at,
806                retry_exp_backoff: Duration::ZERO,
807            },
808            clock_fn,
809            pool,
810            exec_config,
811            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
812                SupportedFunctionReturnValue::None,
813                Version::new(2),
814                None,
815            ))),
816            tick_fn,
817        )
818        .await;
819        assert_matches!(
820            execution_log.events.get(2).unwrap(),
821            ExecutionEvent {
822                event: ExecutionEventInner::Finished {
823                    result: Ok(SupportedFunctionReturnValue::None),
824                    http_client_traces: None
825                },
826                created_at: _,
827                backtrace_id: None,
828            }
829        );
830    }
831
832    #[tokio::test]
833    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
834        set_up();
835        let created_at = Now.now();
836        let clock_fn = ConstClock(created_at);
837        let (_guard, db_pool) = Database::Memory.set_up().await;
838        let exec_config = ExecConfig {
839            batch_size: 1,
840            lock_expiry: Duration::from_secs(1),
841            tick_sleep: Duration::ZERO,
842            component_id: ComponentId::dummy_activity(),
843            task_limiter: None,
844            executor_id: ExecutorId::generate(),
845        };
846
847        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
848            SupportedFunctionReturnValue::None,
849            Version::new(2),
850            None,
851        )));
852        let exec_task = ExecTask::spawn_new(
853            worker.clone(),
854            exec_config.clone(),
855            clock_fn,
856            db_pool.clone(),
857        );
858
859        let execution_log = create_and_tick(
860            CreateAndTickConfig {
861                execution_id: ExecutionId::generate(),
862                created_at,
863                max_retries: 0,
864                executed_at: created_at,
865                retry_exp_backoff: Duration::ZERO,
866            },
867            clock_fn,
868            db_pool.clone(),
869            exec_config,
870            worker,
871            |_, _, _, _, _| async {
872                tokio::time::sleep(Duration::from_secs(1)).await; // FIXME: non determinism, possible race
873                ExecutionProgress::default()
874            },
875        )
876        .await;
877        exec_task.close().await;
878        db_pool.close().await.unwrap();
879        assert_matches!(
880            execution_log.events.get(2).unwrap(),
881            ExecutionEvent {
882                event: ExecutionEventInner::Finished {
883                    result: Ok(SupportedFunctionReturnValue::None),
884                    http_client_traces: None
885                },
886                created_at: _,
887                backtrace_id: None,
888            }
889        );
890    }
891
892    struct CreateAndTickConfig {
893        execution_id: ExecutionId,
894        created_at: DateTime<Utc>,
895        max_retries: u32,
896        executed_at: DateTime<Utc>,
897        retry_exp_backoff: Duration,
898    }
899
900    async fn create_and_tick<
901        W: Worker,
902        C: ClockFn,
903        T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
904        F: Future<Output = ExecutionProgress>,
905    >(
906        config: CreateAndTickConfig,
907        clock_fn: C,
908        db_pool: Arc<dyn DbPool>,
909        exec_config: ExecConfig,
910        worker: Arc<W>,
911        mut tick: T,
912    ) -> ExecutionLog {
913        // Create an execution
914        let db_connection = db_pool.connection();
915        db_connection
916            .create(CreateRequest {
917                created_at: config.created_at,
918                execution_id: config.execution_id.clone(),
919                ffqn: FFQN_SOME,
920                params: Params::empty(),
921                parent: None,
922                metadata: concepts::ExecutionMetadata::empty(),
923                scheduled_at: config.created_at,
924                retry_exp_backoff: config.retry_exp_backoff,
925                max_retries: config.max_retries,
926                component_id: ComponentId::dummy_activity(),
927                scheduled_by: None,
928            })
929            .await
930            .unwrap();
931        // execute!
932        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
933        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
934        debug!("Execution history after tick: {execution_log:?}");
935        // check that DB contains Created and Locked events.
936        assert_matches!(
937            execution_log.events.first().unwrap(),
938            ExecutionEvent {
939                event: ExecutionEventInner::Created { .. },
940                created_at: actually_created_at,
941                backtrace_id: None,
942            }
943            if config.created_at == *actually_created_at
944        );
945        let locked_at = assert_matches!(
946            execution_log.events.get(1).unwrap(),
947            ExecutionEvent {
948                event: ExecutionEventInner::Locked { .. },
949                created_at: locked_at,
950                backtrace_id: None,
951            } if config.created_at <= *locked_at
952            => *locked_at
953        );
954        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
955            event: _,
956            created_at: executed_at,
957            backtrace_id: None,
958        } if *executed_at >= locked_at);
959        execution_log
960    }
961
962    #[tokio::test]
963    async fn activity_trap_should_trigger_an_execution_retry() {
964        set_up();
965        let sim_clock = SimClock::default();
966        let (_guard, db_pool) = Database::Memory.set_up().await;
967        let exec_config = ExecConfig {
968            batch_size: 1,
969            lock_expiry: Duration::from_secs(1),
970            tick_sleep: Duration::ZERO,
971            component_id: ComponentId::dummy_activity(),
972            task_limiter: None,
973            executor_id: ExecutorId::generate(),
974        };
975        let expected_reason = "error reason";
976        let expected_detail = "error detail";
977        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
978            WorkerError::ActivityTrap {
979                reason: expected_reason.to_string(),
980                trap_kind: concepts::TrapKind::Trap,
981                detail: Some(expected_detail.to_string()),
982                version: Version::new(2),
983                http_client_traces: None,
984            },
985        )));
986        let retry_exp_backoff = Duration::from_millis(100);
987        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
988        let execution_log = create_and_tick(
989            CreateAndTickConfig {
990                execution_id: ExecutionId::generate(),
991                created_at: sim_clock.now(),
992                max_retries: 1,
993                executed_at: sim_clock.now(),
994                retry_exp_backoff,
995            },
996            sim_clock.clone(),
997            db_pool.clone(),
998            exec_config.clone(),
999            worker,
1000            tick_fn,
1001        )
1002        .await;
1003        assert_eq!(3, execution_log.events.len());
1004        {
1005            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1006                &execution_log.events.get(2).unwrap(),
1007                ExecutionEvent {
1008                    event: ExecutionEventInner::TemporarilyFailed {
1009                        reason_inner,
1010                        reason_full,
1011                        detail,
1012                        backoff_expires_at,
1013                        http_client_traces: None,
1014                    },
1015                    created_at: at,
1016                    backtrace_id: None,
1017                }
1018                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1019            );
1020            assert_eq!(expected_reason, reason_inner.deref());
1021            assert_eq!(
1022                format!("activity trap: {expected_reason}"),
1023                reason_full.deref()
1024            );
1025            assert_eq!(Some(expected_detail), detail.as_deref());
1026            assert_eq!(at, sim_clock.now());
1027            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1028        }
1029        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1030            std::sync::Mutex::new(IndexMap::from([(
1031                Version::new(4),
1032                (
1033                    vec![],
1034                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1035                ),
1036            )])),
1037        )));
1038        // noop until `retry_exp_backoff` expires
1039        assert!(
1040            tick_fn(
1041                exec_config.clone(),
1042                sim_clock.clone(),
1043                db_pool.clone(),
1044                worker.clone(),
1045                sim_clock.now(),
1046            )
1047            .await
1048            .executions
1049            .is_empty()
1050        );
1051        // tick again to finish the execution
1052        sim_clock.move_time_forward(retry_exp_backoff);
1053        tick_fn(
1054            exec_config,
1055            sim_clock.clone(),
1056            db_pool.clone(),
1057            worker,
1058            sim_clock.now(),
1059        )
1060        .await;
1061        let execution_log = {
1062            let db_connection = db_pool.connection();
1063            db_connection
1064                .get(&execution_log.execution_id)
1065                .await
1066                .unwrap()
1067        };
1068        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1069        assert_matches!(
1070            execution_log.events.get(3).unwrap(),
1071            ExecutionEvent {
1072                event: ExecutionEventInner::Locked { .. },
1073                created_at: at,
1074                backtrace_id: None,
1075            } if *at == sim_clock.now()
1076        );
1077        assert_matches!(
1078            execution_log.events.get(4).unwrap(),
1079            ExecutionEvent {
1080                event: ExecutionEventInner::Finished {
1081                    result: Ok(SupportedFunctionReturnValue::None),
1082                    http_client_traces: None
1083                },
1084                created_at: finished_at,
1085                backtrace_id: None,
1086            } if *finished_at == sim_clock.now()
1087        );
1088        db_pool.close().await.unwrap();
1089    }
1090
1091    #[tokio::test]
1092    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1093        set_up();
1094        let created_at = Now.now();
1095        let clock_fn = ConstClock(created_at);
1096        let (_guard, db_pool) = Database::Memory.set_up().await;
1097        let exec_config = ExecConfig {
1098            batch_size: 1,
1099            lock_expiry: Duration::from_secs(1),
1100            tick_sleep: Duration::ZERO,
1101            component_id: ComponentId::dummy_activity(),
1102            task_limiter: None,
1103            executor_id: ExecutorId::generate(),
1104        };
1105
1106        let expected_reason = "error reason";
1107        let expected_detail = "error detail";
1108        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1109            WorkerError::ActivityTrap {
1110                reason: expected_reason.to_string(),
1111                trap_kind: concepts::TrapKind::Trap,
1112                detail: Some(expected_detail.to_string()),
1113                version: Version::new(2),
1114                http_client_traces: None,
1115            },
1116        )));
1117        let execution_log = create_and_tick(
1118            CreateAndTickConfig {
1119                execution_id: ExecutionId::generate(),
1120                created_at,
1121                max_retries: 0,
1122                executed_at: created_at,
1123                retry_exp_backoff: Duration::ZERO,
1124            },
1125            clock_fn,
1126            db_pool.clone(),
1127            exec_config.clone(),
1128            worker,
1129            tick_fn,
1130        )
1131        .await;
1132        assert_eq!(3, execution_log.events.len());
1133        let (reason, kind, detail) = assert_matches!(
1134            &execution_log.events.get(2).unwrap(),
1135            ExecutionEvent {
1136                event: ExecutionEventInner::Finished{
1137                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1138                    http_client_traces: None
1139                },
1140                created_at: at,
1141                backtrace_id: None,
1142            } if *at == created_at
1143            => (reason_inner, kind, detail)
1144        );
1145        assert_eq!(expected_reason, *reason);
1146        assert_eq!(Some(expected_detail), detail.as_deref());
1147        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1148
1149        db_pool.close().await.unwrap();
1150    }
1151
1152    #[tokio::test]
1153    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1154        let worker_error = WorkerError::ActivityTrap {
1155            reason: "error reason".to_string(),
1156            trap_kind: TrapKind::Trap,
1157            detail: Some("detail".to_string()),
1158            version: Version::new(2),
1159            http_client_traces: None,
1160        };
1161        let expected_child_err = FinishedExecutionError::PermanentFailure {
1162            reason_full: "activity trap: error reason".to_string(),
1163            reason_inner: "error reason".to_string(),
1164            kind: PermanentFailureKind::ActivityTrap,
1165            detail: Some("detail".to_string()),
1166        };
1167        child_execution_permanently_failed_should_notify_parent(
1168            WorkerResult::Err(worker_error),
1169            expected_child_err,
1170        )
1171        .await;
1172    }
1173
1174    // TODO: Add test for WorkerError::TemporaryTimeout
1175
1176    #[tokio::test]
1177    async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1178        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1179        child_execution_permanently_failed_should_notify_parent(
1180            WorkerResult::DbUpdatedByWorkerOrWatcher,
1181            expected_child_err,
1182        )
1183        .await;
1184    }
1185
1186    #[tokio::test]
1187    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1188        let parent_id = ExecutionId::from_parts(1, 1);
1189        let join_set_id_outer =
1190            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1191        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1192        let join_set_id_inner =
1193            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1194        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1195        let worker_error = WorkerError::FatalError(
1196            FatalError::UnhandledChildExecutionError {
1197                child_execution_id: child_execution_id.clone(),
1198                root_cause_id: root_cause_id.clone(),
1199            },
1200            Version::new(2),
1201        );
1202        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1203            child_execution_id,
1204            root_cause_id,
1205        };
1206        child_execution_permanently_failed_should_notify_parent(
1207            WorkerResult::Err(worker_error),
1208            expected_child_err,
1209        )
1210        .await;
1211    }
1212
1213    async fn child_execution_permanently_failed_should_notify_parent(
1214        worker_result: WorkerResult,
1215        expected_child_err: FinishedExecutionError,
1216    ) {
1217        use concepts::storage::JoinSetResponseEventOuter;
1218        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1219
1220        set_up();
1221        let sim_clock = SimClock::default();
1222        let (_guard, db_pool) = Database::Memory.set_up().await;
1223
1224        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1225            WorkerResult::DbUpdatedByWorkerOrWatcher,
1226        ));
1227        let parent_execution_id = ExecutionId::generate();
1228        db_pool
1229            .connection()
1230            .create(CreateRequest {
1231                created_at: sim_clock.now(),
1232                execution_id: parent_execution_id.clone(),
1233                ffqn: FFQN_SOME,
1234                params: Params::empty(),
1235                parent: None,
1236                metadata: concepts::ExecutionMetadata::empty(),
1237                scheduled_at: sim_clock.now(),
1238                retry_exp_backoff: Duration::ZERO,
1239                max_retries: 0,
1240                component_id: ComponentId::dummy_activity(),
1241                scheduled_by: None,
1242            })
1243            .await
1244            .unwrap();
1245        tick_fn(
1246            ExecConfig {
1247                batch_size: 1,
1248                lock_expiry: LOCK_EXPIRY,
1249                tick_sleep: Duration::ZERO,
1250                component_id: ComponentId::dummy_activity(),
1251                task_limiter: None,
1252                executor_id: ExecutorId::generate(),
1253            },
1254            sim_clock.clone(),
1255            db_pool.clone(),
1256            parent_worker,
1257            sim_clock.now(),
1258        )
1259        .await;
1260
1261        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1262        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1263        // executor does not append anything, this should have been written by the worker:
1264        {
1265            let child = CreateRequest {
1266                created_at: sim_clock.now(),
1267                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1268                ffqn: FFQN_CHILD,
1269                params: Params::empty(),
1270                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1271                metadata: concepts::ExecutionMetadata::empty(),
1272                scheduled_at: sim_clock.now(),
1273                retry_exp_backoff: Duration::ZERO,
1274                max_retries: 0,
1275                component_id: ComponentId::dummy_activity(),
1276                scheduled_by: None,
1277            };
1278            let current_time = sim_clock.now();
1279            let join_set = AppendRequest {
1280                created_at: current_time,
1281                event: ExecutionEventInner::HistoryEvent {
1282                    event: HistoryEvent::JoinSetCreate {
1283                        join_set_id: join_set_id.clone(),
1284                        closing_strategy: ClosingStrategy::Complete,
1285                    },
1286                },
1287            };
1288            let child_exec_req = AppendRequest {
1289                created_at: current_time,
1290                event: ExecutionEventInner::HistoryEvent {
1291                    event: HistoryEvent::JoinSetRequest {
1292                        join_set_id: join_set_id.clone(),
1293                        request: JoinSetRequest::ChildExecutionRequest {
1294                            child_execution_id: child_execution_id.clone(),
1295                        },
1296                    },
1297                },
1298            };
1299            let join_next = AppendRequest {
1300                created_at: current_time,
1301                event: ExecutionEventInner::HistoryEvent {
1302                    event: HistoryEvent::JoinNext {
1303                        join_set_id: join_set_id.clone(),
1304                        run_expires_at: sim_clock.now(),
1305                        closing: false,
1306                        requested_ffqn: Some(FFQN_CHILD),
1307                    },
1308                },
1309            };
1310            db_pool
1311                .connection()
1312                .append_batch_create_new_execution(
1313                    current_time,
1314                    vec![join_set, child_exec_req, join_next],
1315                    parent_execution_id.clone(),
1316                    Version::new(2),
1317                    vec![child],
1318                )
1319                .await
1320                .unwrap();
1321        }
1322
1323        let child_worker =
1324            Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1325
1326        // execute the child
1327        tick_fn(
1328            ExecConfig {
1329                batch_size: 1,
1330                lock_expiry: LOCK_EXPIRY,
1331                tick_sleep: Duration::ZERO,
1332                component_id: ComponentId::dummy_activity(),
1333                task_limiter: None,
1334                executor_id: ExecutorId::generate(),
1335            },
1336            sim_clock.clone(),
1337            db_pool.clone(),
1338            child_worker,
1339            sim_clock.now(),
1340        )
1341        .await;
1342        if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1343            // In case of timeout, let the timers watcher handle it
1344            sim_clock.move_time_forward(LOCK_EXPIRY);
1345            expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1346                .await
1347                .unwrap();
1348        }
1349        let child_log = db_pool
1350            .connection()
1351            .get(&ExecutionId::Derived(child_execution_id.clone()))
1352            .await
1353            .unwrap();
1354        assert!(child_log.pending_state.is_finished());
1355        assert_eq!(
1356            Version(2),
1357            child_log.next_version,
1358            "created = 0, locked = 1, with_single_result = 2"
1359        );
1360        assert_eq!(
1361            ExecutionEventInner::Finished {
1362                result: Err(expected_child_err),
1363                http_client_traces: None
1364            },
1365            child_log.last_event().event
1366        );
1367        let parent_log = db_pool
1368            .connection()
1369            .get(&parent_execution_id)
1370            .await
1371            .unwrap();
1372        assert_matches!(
1373            parent_log.pending_state,
1374            PendingState::PendingAt {
1375                scheduled_at
1376            } if scheduled_at == sim_clock.now(),
1377            "parent should be back to pending"
1378        );
1379        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1380            parent_log.responses.last(),
1381            Some(JoinSetResponseEventOuter{
1382                created_at: at,
1383                event: JoinSetResponseEvent{
1384                    join_set_id: found_join_set_id,
1385                    event: JoinSetResponse::ChildExecutionFinished {
1386                        child_execution_id: found_child_execution_id,
1387                        finished_version,
1388                        result: found_result,
1389                    }
1390                }
1391            })
1392             if *at == sim_clock.now()
1393            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1394        );
1395        assert_eq!(join_set_id, *found_join_set_id);
1396        assert_eq!(child_execution_id, *found_child_execution_id);
1397        assert_eq!(child_log.next_version, *child_finished_version);
1398        assert!(found_result.is_err());
1399
1400        db_pool.close().await.unwrap();
1401    }
1402
1403    #[derive(Clone, Debug)]
1404    struct SleepyWorker {
1405        duration: Duration,
1406        result: SupportedFunctionReturnValue,
1407        exported: [FunctionMetadata; 1],
1408    }
1409
1410    #[async_trait]
1411    impl Worker for SleepyWorker {
1412        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1413            tokio::time::sleep(self.duration).await;
1414            WorkerResult::Ok(self.result.clone(), ctx.version, None)
1415        }
1416
1417        fn exported_functions(&self) -> &[FunctionMetadata] {
1418            &self.exported
1419        }
1420    }
1421
1422    #[tokio::test]
1423    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1424        set_up();
1425        let sim_clock = SimClock::default();
1426        let (_guard, db_pool) = Database::Memory.set_up().await;
1427        let lock_expiry = Duration::from_millis(100);
1428        let exec_config = ExecConfig {
1429            batch_size: 1,
1430            lock_expiry,
1431            tick_sleep: Duration::ZERO,
1432            component_id: ComponentId::dummy_activity(),
1433            task_limiter: None,
1434            executor_id: ExecutorId::generate(),
1435        };
1436
1437        let worker = Arc::new(SleepyWorker {
1438            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1439            result: SupportedFunctionReturnValue::None,
1440            exported: [FunctionMetadata {
1441                ffqn: FFQN_SOME,
1442                parameter_types: ParameterTypes::default(),
1443                return_type: None,
1444                extension: None,
1445                submittable: true,
1446            }],
1447        });
1448        // Create an execution
1449        let execution_id = ExecutionId::generate();
1450        let timeout_duration = Duration::from_millis(300);
1451        let db_connection = db_pool.connection();
1452        db_connection
1453            .create(CreateRequest {
1454                created_at: sim_clock.now(),
1455                execution_id: execution_id.clone(),
1456                ffqn: FFQN_SOME,
1457                params: Params::empty(),
1458                parent: None,
1459                metadata: concepts::ExecutionMetadata::empty(),
1460                scheduled_at: sim_clock.now(),
1461                retry_exp_backoff: timeout_duration,
1462                max_retries: 1,
1463                component_id: ComponentId::dummy_activity(),
1464                scheduled_by: None,
1465            })
1466            .await
1467            .unwrap();
1468
1469        let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1470        let executor = ExecTask::new(
1471            worker,
1472            exec_config,
1473            sim_clock.clone(),
1474            db_pool.clone(),
1475            ffqns,
1476        );
1477        let mut first_execution_progress = executor
1478            .tick(sim_clock.now(), RunId::generate())
1479            .await
1480            .unwrap();
1481        assert_eq!(1, first_execution_progress.executions.len());
1482        // Started hanging, wait for lock expiry.
1483        sim_clock.move_time_forward(lock_expiry);
1484        // cleanup should be called
1485        let now_after_first_lock_expiry = sim_clock.now();
1486        {
1487            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1488            let cleanup_progress = executor
1489                .tick(now_after_first_lock_expiry, RunId::generate())
1490                .await
1491                .unwrap();
1492            assert!(cleanup_progress.executions.is_empty());
1493        }
1494        {
1495            let expired_locks = expired_timers_watcher::tick(
1496                db_pool.connection().as_ref(),
1497                now_after_first_lock_expiry,
1498            )
1499            .await
1500            .unwrap()
1501            .expired_locks;
1502            assert_eq!(1, expired_locks);
1503        }
1504        assert!(
1505            !first_execution_progress
1506                .executions
1507                .pop()
1508                .unwrap()
1509                .1
1510                .is_finished()
1511        );
1512
1513        let execution_log = db_connection.get(&execution_id).await.unwrap();
1514        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1515        assert_matches!(
1516            &execution_log.events.get(2).unwrap(),
1517            ExecutionEvent {
1518                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1519                created_at: at,
1520                backtrace_id: None,
1521            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1522        );
1523        assert_eq!(
1524            PendingState::PendingAt {
1525                scheduled_at: expected_first_timeout_expiry
1526            },
1527            execution_log.pending_state
1528        );
1529        sim_clock.move_time_forward(timeout_duration);
1530        let now_after_first_timeout = sim_clock.now();
1531        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1532
1533        let mut second_execution_progress = executor
1534            .tick(now_after_first_timeout, RunId::generate())
1535            .await
1536            .unwrap();
1537        assert_eq!(1, second_execution_progress.executions.len());
1538
1539        // Started hanging, wait for lock expiry.
1540        sim_clock.move_time_forward(lock_expiry);
1541        // cleanup should be called
1542        let now_after_second_lock_expiry = sim_clock.now();
1543        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1544        {
1545            let cleanup_progress = executor
1546                .tick(now_after_second_lock_expiry, RunId::generate())
1547                .await
1548                .unwrap();
1549            assert!(cleanup_progress.executions.is_empty());
1550        }
1551        {
1552            let expired_locks = expired_timers_watcher::tick(
1553                db_pool.connection().as_ref(),
1554                now_after_second_lock_expiry,
1555            )
1556            .await
1557            .unwrap()
1558            .expired_locks;
1559            assert_eq!(1, expired_locks);
1560        }
1561        assert!(
1562            !second_execution_progress
1563                .executions
1564                .pop()
1565                .unwrap()
1566                .1
1567                .is_finished()
1568        );
1569
1570        drop(db_connection);
1571        drop(executor);
1572        db_pool.close().await.unwrap();
1573    }
1574}