1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
8 LockedExecution,
9};
10use concepts::time::{ClockFn, Sleep};
11use concepts::{
12 ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
13};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16 FinishedExecutionError,
17 storage::{ExecutionEventInner, JoinSetResponse, Version},
18};
19use concepts::{JoinSetId, PermanentFailureKind};
20use std::fmt::Display;
21use std::{
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26 time::Duration,
27};
28use tokio::task::{AbortHandle, JoinHandle};
29use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
30
31#[derive(Debug, Clone)]
32pub struct ExecConfig {
33 pub lock_expiry: Duration,
34 pub tick_sleep: Duration,
35 pub batch_size: u32,
36 pub component_id: ComponentId,
37 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
38 pub executor_id: ExecutorId,
39 pub retry_config: ComponentRetryConfig,
40}
41
42pub struct ExecTask<C: ClockFn> {
43 worker: Arc<dyn Worker>,
44 config: ExecConfig,
45 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
47 ffqns: Arc<[FunctionFqn]>,
48}
49
50#[derive(derive_more::Debug, Default)]
51pub struct ExecutionProgress {
52 #[debug(skip)]
53 #[allow(dead_code)]
54 executions: Vec<(ExecutionId, JoinHandle<()>)>,
55}
56
57impl ExecutionProgress {
58 #[cfg(feature = "test")]
59 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
60 let mut vec = Vec::new();
61 for (exe, join_handle) in self.executions {
62 vec.push(exe);
63 join_handle.await.unwrap();
64 }
65 vec
66 }
67}
68
69#[derive(derive_more::Debug)]
70pub struct ExecutorTaskHandle {
71 #[debug(skip)]
72 is_closing: Arc<AtomicBool>,
73 #[debug(skip)]
74 abort_handle: AbortHandle,
75 component_id: ComponentId,
76 executor_id: ExecutorId,
77}
78
79impl ExecutorTaskHandle {
80 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
81 pub async fn close(&self) {
82 trace!("Gracefully closing");
83 self.is_closing.store(true, Ordering::Relaxed);
91 while !self.abort_handle.is_finished() {
92 tokio::time::sleep(Duration::from_millis(1)).await;
93 }
94 debug!("Gracefully closed");
95 }
96}
97
98impl Drop for ExecutorTaskHandle {
99 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
100 fn drop(&mut self) {
101 if self.abort_handle.is_finished() {
102 return;
103 }
104 warn!("Aborting the executor task");
105 self.abort_handle.abort();
106 }
107}
108
109#[cfg(feature = "test")]
110pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
111 extract_exported_ffqns_noext(worker)
112}
113
114fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
115 worker
116 .exported_functions()
117 .iter()
118 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
119 .collect::<Arc<_>>()
120}
121
122impl<C: ClockFn + 'static> ExecTask<C> {
123 #[cfg(feature = "test")]
124 pub fn new_test(
125 worker: Arc<dyn Worker>,
126 config: ExecConfig,
127 clock_fn: C,
128 db_exec: Arc<dyn DbExecutor>,
129 ffqns: Arc<[FunctionFqn]>,
130 ) -> Self {
131 Self {
132 worker,
133 config,
134 clock_fn,
135 db_exec,
136 ffqns,
137 }
138 }
139
140 #[cfg(feature = "test")]
141 pub fn new_all_ffqns_test(
142 worker: Arc<dyn Worker>,
143 config: ExecConfig,
144 clock_fn: C,
145 db_exec: Arc<dyn DbExecutor>,
146 ) -> Self {
147 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
148 Self {
149 worker,
150 config,
151 clock_fn,
152 db_exec,
153 ffqns,
154 }
155 }
156
157 pub fn spawn_new(
158 worker: Arc<dyn Worker>,
159 config: ExecConfig,
160 clock_fn: C,
161 db_exec: Arc<dyn DbExecutor>,
162 sleep: impl Sleep + 'static,
163 ) -> ExecutorTaskHandle {
164 let is_closing = Arc::new(AtomicBool::default());
165 let is_closing_inner = is_closing.clone();
166 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167 let component_id = config.component_id.clone();
168 let executor_id = config.executor_id;
169 let abort_handle = tokio::spawn(async move {
170 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
171 let task = ExecTask {
172 worker,
173 config,
174 db_exec,
175 ffqns: ffqns.clone(),
176 clock_fn: clock_fn.clone(),
177 };
178 while !is_closing_inner.load(Ordering::Relaxed) {
179 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
180
181 task.db_exec
182 .wait_for_pending(clock_fn.now(), ffqns.clone(), {
183 let sleep = sleep.clone();
184 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
185 .await;
186 }
187 })
188 .abort_handle();
189 ExecutorTaskHandle {
190 is_closing,
191 abort_handle,
192 component_id,
193 executor_id,
194 }
195 }
196
197 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
198 if let Some(task_limiter) = &self.config.task_limiter {
199 let mut locks = Vec::new();
200 for _ in 0..self.config.batch_size {
201 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
202 locks.push(Some(permit));
203 } else {
204 break;
205 }
206 }
207 locks
208 } else {
209 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
210 for _ in 0..self.config.batch_size {
211 vec.push(None);
212 }
213 vec
214 }
215 }
216
217 #[cfg(feature = "test")]
218 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
219 self.tick(executed_at, run_id).await.unwrap()
220 }
221
222 #[cfg(feature = "test")]
223 pub async fn tick_test_await(
224 &self,
225 executed_at: DateTime<Utc>,
226 run_id: RunId,
227 ) -> Vec<ExecutionId> {
228 self.tick(executed_at, run_id)
229 .await
230 .unwrap()
231 .wait_for_tasks()
232 .await
233 }
234
235 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
236 async fn tick(
237 &self,
238 executed_at: DateTime<Utc>,
239 run_id: RunId,
240 ) -> Result<ExecutionProgress, DbErrorGeneric> {
241 let locked_executions = {
242 let mut permits = self.acquire_task_permits();
243 if permits.is_empty() {
244 return Ok(ExecutionProgress::default());
245 }
246 let lock_expires_at = executed_at + self.config.lock_expiry;
247 let locked_executions = self
248 .db_exec
249 .lock_pending(
250 permits.len(), executed_at, self.ffqns.clone(),
253 executed_at, self.config.component_id.clone(),
255 self.config.executor_id,
256 lock_expires_at,
257 run_id,
258 self.config.retry_config,
259 )
260 .await?;
261 while permits.len() > locked_executions.len() {
263 permits.pop();
264 }
265 assert_eq!(permits.len(), locked_executions.len());
266 locked_executions.into_iter().zip(permits)
267 };
268 let execution_deadline = executed_at + self.config.lock_expiry;
269
270 let mut executions = Vec::with_capacity(locked_executions.len());
271 for (locked_execution, permit) in locked_executions {
272 let execution_id = locked_execution.execution_id.clone();
273 let join_handle = {
274 let worker = self.worker.clone();
275 let db = self.db_exec.clone();
276 let clock_fn = self.clock_fn.clone();
277 let run_id = locked_execution.run_id;
278 let worker_span = info_span!(parent: None, "worker",
279 "otel.name" = format!("worker {}", locked_execution.ffqn),
280 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
281 locked_execution.metadata.enrich(&worker_span);
282 tokio::spawn({
283 let worker_span2 = worker_span.clone();
284 let retry_config = self.config.retry_config;
285 async move {
286 let _permit = permit;
287 let res = Self::run_worker(
288 worker,
289 db.as_ref(),
290 execution_deadline,
291 clock_fn,
292 locked_execution,
293 retry_config,
294 worker_span2,
295 )
296 .await;
297 if let Err(db_error) = res {
298 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
299 }
300 }
301 .instrument(worker_span)
302 })
303 };
304 executions.push((execution_id, join_handle));
305 }
306 Ok(ExecutionProgress { executions })
307 }
308
309 async fn run_worker(
310 worker: Arc<dyn Worker>,
311 db_exec: &dyn DbExecutor,
312 execution_deadline: DateTime<Utc>,
313 clock_fn: C,
314 locked_execution: LockedExecution,
315 retry_config: ComponentRetryConfig,
316 worker_span: Span,
317 ) -> Result<(), DbErrorWrite> {
318 debug!("Worker::run starting");
319 trace!(
320 version = %locked_execution.next_version,
321 params = ?locked_execution.params,
322 event_history = ?locked_execution.event_history,
323 "Worker::run starting"
324 );
325 let can_be_retried = ExecutionLog::can_be_retried_after(
326 locked_execution.intermittent_event_count + 1,
327 retry_config.max_retries,
328 retry_config.retry_exp_backoff,
329 );
330 let unlock_expiry_on_limit_reached =
331 ExecutionLog::compute_retry_duration_when_retrying_forever(
332 locked_execution.intermittent_event_count + 1,
333 retry_config.retry_exp_backoff,
334 );
335 let ctx = WorkerContext {
336 execution_id: locked_execution.execution_id.clone(),
337 metadata: locked_execution.metadata,
338 ffqn: locked_execution.ffqn,
339 params: locked_execution.params,
340 event_history: locked_execution.event_history,
341 responses: locked_execution
342 .responses
343 .into_iter()
344 .map(|outer| outer.event)
345 .collect(),
346 version: locked_execution.next_version,
347 execution_deadline,
348 can_be_retried: can_be_retried.is_some(),
349 run_id: locked_execution.run_id,
350 worker_span,
351 };
352 let worker_result = worker.run(ctx).await;
353 trace!(?worker_result, "Worker::run finished");
354 let result_obtained_at = clock_fn.now();
355 match Self::worker_result_to_execution_event(
356 locked_execution.execution_id,
357 worker_result,
358 result_obtained_at,
359 locked_execution.parent,
360 can_be_retried,
361 unlock_expiry_on_limit_reached,
362 )? {
363 Some(append) => {
364 trace!("Appending {append:?}");
365 append.append(db_exec).await
366 }
367 None => Ok(()),
368 }
369 }
370
371 fn worker_result_to_execution_event(
373 execution_id: ExecutionId,
374 worker_result: WorkerResult,
375 result_obtained_at: DateTime<Utc>,
376 parent: Option<(ExecutionId, JoinSetId)>,
377 can_be_retried: Option<Duration>,
378 unlock_expiry_on_limit_reached: Duration,
379 ) -> Result<Option<Append>, DbErrorWrite> {
380 Ok(match worker_result {
381 WorkerResult::Ok(result, new_version, http_client_traces) => {
382 info!(
383 "Execution finished: {}",
384 result.as_pending_state_finished_result()
385 );
386 let child_finished =
387 parent.map(
388 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
389 parent_execution_id,
390 parent_join_set,
391 result: result.clone(),
392 },
393 );
394 let primary_event = AppendRequest {
395 created_at: result_obtained_at,
396 event: ExecutionEventInner::Finished {
397 result,
398 http_client_traces,
399 },
400 };
401
402 Some(Append {
403 created_at: result_obtained_at,
404 primary_event,
405 execution_id,
406 version: new_version,
407 child_finished,
408 })
409 }
410 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
411 WorkerResult::Err(err) => {
412 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
414 WorkerError::TemporaryTimeout {
415 http_client_traces,
416 version,
417 } => {
418 if let Some(duration) = can_be_retried {
419 let backoff_expires_at = result_obtained_at + duration;
420 info!(
421 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
422 );
423 (
424 ExecutionEventInner::TemporarilyTimedOut {
425 backoff_expires_at,
426 http_client_traces,
427 },
428 None,
429 version,
430 )
431 } else {
432 info!("Permanent timeout");
433 let result = SupportedFunctionReturnValue::ExecutionError(
434 FinishedExecutionError::PermanentTimeout,
435 );
436 let child_finished =
437 parent.map(|(parent_execution_id, parent_join_set)| {
438 ChildFinishedResponse {
439 parent_execution_id,
440 parent_join_set,
441 result: result.clone(),
442 }
443 });
444 (
445 ExecutionEventInner::Finished {
446 result,
447 http_client_traces,
448 },
449 child_finished,
450 version,
451 )
452 }
453 }
454 WorkerError::DbError(db_error) => {
455 return Err(db_error);
456 }
457 WorkerError::ActivityTrap {
458 reason: reason_inner,
459 trap_kind,
460 detail,
461 version,
462 http_client_traces,
463 } => retry_or_fail(
464 result_obtained_at,
465 parent,
466 can_be_retried,
467 reason_full,
468 reason_inner,
469 trap_kind, detail,
471 version,
472 http_client_traces,
473 ),
474 WorkerError::ActivityPreopenedDirError {
475 reason_kind,
476 reason_inner,
477 version,
478 } => retry_or_fail(
479 result_obtained_at,
480 parent,
481 can_be_retried,
482 reason_full,
483 reason_inner,
484 reason_kind, None, version,
487 None, ),
489 WorkerError::ActivityReturnedError {
490 detail,
491 version,
492 http_client_traces,
493 } => {
494 let duration = can_be_retried.expect(
495 "ActivityReturnedError must not be returned when retries are exhausted",
496 );
497 let expires_at = result_obtained_at + duration;
498 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
499 (
500 ExecutionEventInner::TemporarilyFailed {
501 backoff_expires_at: expires_at,
502 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
504 detail, http_client_traces,
506 },
507 None,
508 version,
509 )
510 }
511 WorkerError::LimitReached {
512 reason: inner_reason,
513 version: new_version,
514 } => {
515 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
516 warn!(
517 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
518 );
519 (
520 ExecutionEventInner::Unlocked {
521 backoff_expires_at: expires_at,
522 reason: StrVariant::from(reason_full),
523 },
524 None,
525 new_version,
526 )
527 }
528 WorkerError::FatalError(fatal_error, version) => {
529 warn!("Fatal worker error - {fatal_error:?}");
530 let result = SupportedFunctionReturnValue::ExecutionError(
531 FinishedExecutionError::from(fatal_error),
532 );
533 let child_finished =
534 parent.map(|(parent_execution_id, parent_join_set)| {
535 ChildFinishedResponse {
536 parent_execution_id,
537 parent_join_set,
538 result: result.clone(),
539 }
540 });
541 (
542 ExecutionEventInner::Finished {
543 result,
544 http_client_traces: None,
545 },
546 child_finished,
547 version,
548 )
549 }
550 };
551 Some(Append {
552 created_at: result_obtained_at,
553 primary_event: AppendRequest {
554 created_at: result_obtained_at,
555 event: primary_event,
556 },
557 execution_id,
558 version,
559 child_finished,
560 })
561 }
562 })
563 }
564}
565
566#[expect(clippy::too_many_arguments)]
567fn retry_or_fail(
568 result_obtained_at: DateTime<Utc>,
569 parent: Option<(ExecutionId, JoinSetId)>,
570 can_be_retried: Option<Duration>,
571 reason_full: String,
572 reason_inner: String,
573 err_kind_for_logs: impl Display,
574 detail: Option<String>,
575 version: Version,
576 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
577) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
578 if let Some(duration) = can_be_retried {
579 let expires_at = result_obtained_at + duration;
580 debug!(
581 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
582 );
583 (
584 ExecutionEventInner::TemporarilyFailed {
585 backoff_expires_at: expires_at,
586 reason_full: StrVariant::from(reason_full),
587 reason_inner: StrVariant::from(reason_inner),
588 detail,
589 http_client_traces,
590 },
591 None,
592 version,
593 )
594 } else {
595 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
596 let result = SupportedFunctionReturnValue::ExecutionError(
597 FinishedExecutionError::PermanentFailure {
598 reason_inner,
599 reason_full,
600 kind: PermanentFailureKind::ActivityTrap,
601 detail,
602 },
603 );
604
605 let child_finished =
606 parent.map(
607 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
608 parent_execution_id,
609 parent_join_set,
610 result: result.clone(),
611 },
612 );
613 (
614 ExecutionEventInner::Finished {
615 result,
616 http_client_traces,
617 },
618 child_finished,
619 version,
620 )
621 }
622}
623
624#[derive(Debug, Clone)]
625pub(crate) struct ChildFinishedResponse {
626 pub(crate) parent_execution_id: ExecutionId,
627 pub(crate) parent_join_set: JoinSetId,
628 pub(crate) result: SupportedFunctionReturnValue,
629}
630
631#[derive(Debug, Clone)]
632pub(crate) struct Append {
633 pub(crate) created_at: DateTime<Utc>,
634 pub(crate) primary_event: AppendRequest,
635 pub(crate) execution_id: ExecutionId,
636 pub(crate) version: Version,
637 pub(crate) child_finished: Option<ChildFinishedResponse>,
638}
639
640impl Append {
641 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
642 if let Some(child_finished) = self.child_finished {
643 assert_matches!(
644 &self.primary_event,
645 AppendRequest {
646 event: ExecutionEventInner::Finished { .. },
647 ..
648 }
649 );
650 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
651 let events = AppendEventsToExecution {
652 execution_id: self.execution_id,
653 version: self.version.clone(),
654 batch: vec![self.primary_event],
655 };
656 let response = AppendResponseToExecution {
657 parent_execution_id: child_finished.parent_execution_id,
658 parent_response_event: JoinSetResponseEventOuter {
659 created_at: self.created_at,
660 event: JoinSetResponseEvent {
661 join_set_id: child_finished.parent_join_set,
662 event: JoinSetResponse::ChildExecutionFinished {
663 child_execution_id: derived,
664 finished_version: self.version,
666 result: child_finished.result,
667 },
668 },
669 },
670 };
671
672 db_exec
673 .append_batch_respond_to_parent(events, response, self.created_at)
674 .await?;
675 } else {
676 db_exec
677 .append(self.execution_id, self.version, self.primary_event)
678 .await?;
679 }
680 Ok(())
681 }
682}
683
684#[cfg(any(test, feature = "test"))]
685pub mod simple_worker {
686 use crate::worker::{Worker, WorkerContext, WorkerResult};
687 use async_trait::async_trait;
688 use concepts::{
689 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
690 storage::{HistoryEvent, Version},
691 };
692 use indexmap::IndexMap;
693 use std::sync::Arc;
694 use tracing::trace;
695
696 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
697 pub type SimpleWorkerResultMap =
698 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
699
700 #[derive(Clone, Debug)]
701 pub struct SimpleWorker {
702 pub worker_results_rev: SimpleWorkerResultMap,
703 pub ffqn: FunctionFqn,
704 exported: [FunctionMetadata; 1],
705 }
706
707 impl SimpleWorker {
708 #[must_use]
709 pub fn with_single_result(res: WorkerResult) -> Self {
710 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
711 Version::new(2),
712 (vec![], res),
713 )]))))
714 }
715
716 #[must_use]
717 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
718 Self {
719 worker_results_rev: self.worker_results_rev,
720 exported: [FunctionMetadata {
721 ffqn: ffqn.clone(),
722 parameter_types: ParameterTypes::default(),
723 return_type: RETURN_TYPE_DUMMY,
724 extension: None,
725 submittable: true,
726 }],
727 ffqn,
728 }
729 }
730
731 #[must_use]
732 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
733 Self {
734 worker_results_rev,
735 ffqn: FFQN_SOME,
736 exported: [FunctionMetadata {
737 ffqn: FFQN_SOME,
738 parameter_types: ParameterTypes::default(),
739 return_type: RETURN_TYPE_DUMMY,
740 extension: None,
741 submittable: true,
742 }],
743 }
744 }
745 }
746
747 #[async_trait]
748 impl Worker for SimpleWorker {
749 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
750 let (expected_version, (expected_eh, worker_result)) =
751 self.worker_results_rev.lock().unwrap().pop().unwrap();
752 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
753 assert_eq!(expected_version, ctx.version);
754 assert_eq!(expected_eh, ctx.event_history);
755 worker_result
756 }
757
758 fn exported_functions(&self) -> &[FunctionMetadata] {
759 &self.exported
760 }
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use self::simple_worker::SimpleWorker;
767 use super::*;
768 use crate::{expired_timers_watcher, worker::WorkerResult};
769 use assert_matches::assert_matches;
770 use async_trait::async_trait;
771 use concepts::storage::DbPoolCloseable;
772 use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
773 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
774 use concepts::time::Now;
775 use concepts::{
776 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
777 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
778 };
779 use db_tests::Database;
780 use indexmap::IndexMap;
781 use simple_worker::FFQN_SOME;
782 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
783 use test_utils::set_up;
784 use test_utils::sim_clock::{ConstClock, SimClock};
785
786 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
787
788 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
789 config: ExecConfig,
790 clock_fn: C,
791 db_exec: Arc<dyn DbExecutor>,
792 worker: Arc<W>,
793 executed_at: DateTime<Utc>,
794 ) -> Vec<ExecutionId> {
795 trace!("Ticking with {worker:?}");
796 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
797 let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
798 executor
799 .tick_test_await(executed_at, RunId::generate())
800 .await
801 }
802
803 #[tokio::test]
804 async fn execute_simple_lifecycle_tick_based_mem() {
805 let created_at = Now.now();
806 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
807 execute_simple_lifecycle_tick_based(
808 db_pool.connection().as_ref(),
809 db_exec.clone(),
810 ConstClock(created_at),
811 )
812 .await;
813 db_close.close().await;
814 }
815
816 #[tokio::test]
817 async fn execute_simple_lifecycle_tick_based_sqlite() {
818 let created_at = Now.now();
819 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
820 execute_simple_lifecycle_tick_based(
821 db_pool.connection().as_ref(),
822 db_exec.clone(),
823 ConstClock(created_at),
824 )
825 .await;
826 db_close.close().await;
827 }
828
829 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
830 db_connection: &dyn DbConnection,
831 db_exec: Arc<dyn DbExecutor>,
832 clock_fn: C,
833 ) {
834 set_up();
835 let created_at = clock_fn.now();
836 let exec_config = ExecConfig {
837 batch_size: 1,
838 lock_expiry: Duration::from_secs(1),
839 tick_sleep: Duration::from_millis(100),
840 component_id: ComponentId::dummy_activity(),
841 task_limiter: None,
842 executor_id: ExecutorId::generate(),
843 retry_config: ComponentRetryConfig::ZERO,
844 };
845
846 let execution_log = create_and_tick(
847 CreateAndTickConfig {
848 execution_id: ExecutionId::generate(),
849 created_at,
850 executed_at: created_at,
851 },
852 clock_fn,
853 db_connection,
854 db_exec,
855 exec_config,
856 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
857 SUPPORTED_RETURN_VALUE_OK_EMPTY,
858 Version::new(2),
859 None,
860 ))),
861 tick_fn,
862 )
863 .await;
864 assert_matches!(
865 execution_log.events.get(2).unwrap(),
866 ExecutionEvent {
867 event: ExecutionEventInner::Finished {
868 result: SupportedFunctionReturnValue::Ok { ok: None },
869 http_client_traces: None
870 },
871 created_at: _,
872 backtrace_id: None,
873 }
874 );
875 }
876
877 #[tokio::test]
878 async fn execute_simple_lifecycle_task_based_mem() {
879 set_up();
880 let created_at = Now.now();
881 let clock_fn = ConstClock(created_at);
882 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
883 let exec_config = ExecConfig {
884 batch_size: 1,
885 lock_expiry: Duration::from_secs(1),
886 tick_sleep: Duration::ZERO,
887 component_id: ComponentId::dummy_activity(),
888 task_limiter: None,
889 executor_id: ExecutorId::generate(),
890 retry_config: ComponentRetryConfig::ZERO,
891 };
892
893 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
894 SUPPORTED_RETURN_VALUE_OK_EMPTY,
895 Version::new(2),
896 None,
897 )));
898
899 let execution_log = create_and_tick(
900 CreateAndTickConfig {
901 execution_id: ExecutionId::generate(),
902 created_at,
903 executed_at: created_at,
904 },
905 clock_fn,
906 db_pool.connection().as_ref(),
907 db_exec,
908 exec_config,
909 worker,
910 tick_fn,
911 )
912 .await;
913 assert_matches!(
914 execution_log.events.get(2).unwrap(),
915 ExecutionEvent {
916 event: ExecutionEventInner::Finished {
917 result: SupportedFunctionReturnValue::Ok { ok: None },
918 http_client_traces: None
919 },
920 created_at: _,
921 backtrace_id: None,
922 }
923 );
924 db_close.close().await;
925 }
926
927 struct CreateAndTickConfig {
928 execution_id: ExecutionId,
929 created_at: DateTime<Utc>,
930 executed_at: DateTime<Utc>,
931 }
932
933 async fn create_and_tick<
934 W: Worker,
935 C: ClockFn,
936 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
937 F: Future<Output = Vec<ExecutionId>>,
938 >(
939 config: CreateAndTickConfig,
940 clock_fn: C,
941 db_connection: &dyn DbConnection,
942 db_exec: Arc<dyn DbExecutor>,
943 exec_config: ExecConfig,
944 worker: Arc<W>,
945 mut tick: T,
946 ) -> ExecutionLog {
947 db_connection
949 .create(CreateRequest {
950 created_at: config.created_at,
951 execution_id: config.execution_id.clone(),
952 ffqn: FFQN_SOME,
953 params: Params::empty(),
954 parent: None,
955 metadata: concepts::ExecutionMetadata::empty(),
956 scheduled_at: config.created_at,
957 component_id: ComponentId::dummy_activity(),
958 scheduled_by: None,
959 })
960 .await
961 .unwrap();
962 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
964 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
965 debug!("Execution history after tick: {execution_log:?}");
966 assert_matches!(
968 execution_log.events.first().unwrap(),
969 ExecutionEvent {
970 event: ExecutionEventInner::Created { .. },
971 created_at: actually_created_at,
972 backtrace_id: None,
973 }
974 if config.created_at == *actually_created_at
975 );
976 let locked_at = assert_matches!(
977 execution_log.events.get(1).unwrap(),
978 ExecutionEvent {
979 event: ExecutionEventInner::Locked { .. },
980 created_at: locked_at,
981 backtrace_id: None,
982 } if config.created_at <= *locked_at
983 => *locked_at
984 );
985 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
986 event: _,
987 created_at: executed_at,
988 backtrace_id: None,
989 } if *executed_at >= locked_at);
990 execution_log
991 }
992
993 #[tokio::test]
994 async fn activity_trap_should_trigger_an_execution_retry() {
995 set_up();
996 let sim_clock = SimClock::default();
997 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
998 let retry_exp_backoff = Duration::from_millis(100);
999 let retry_config = ComponentRetryConfig {
1000 max_retries: 1,
1001 retry_exp_backoff,
1002 };
1003 let exec_config = ExecConfig {
1004 batch_size: 1,
1005 lock_expiry: Duration::from_secs(1),
1006 tick_sleep: Duration::ZERO,
1007 component_id: ComponentId::dummy_activity(),
1008 task_limiter: None,
1009 executor_id: ExecutorId::generate(),
1010 retry_config,
1011 };
1012 let expected_reason = "error reason";
1013 let expected_detail = "error detail";
1014 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1015 WorkerError::ActivityTrap {
1016 reason: expected_reason.to_string(),
1017 trap_kind: concepts::TrapKind::Trap,
1018 detail: Some(expected_detail.to_string()),
1019 version: Version::new(2),
1020 http_client_traces: None,
1021 },
1022 )));
1023 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1024 let execution_log = create_and_tick(
1025 CreateAndTickConfig {
1026 execution_id: ExecutionId::generate(),
1027 created_at: sim_clock.now(),
1028 executed_at: sim_clock.now(),
1029 },
1030 sim_clock.clone(),
1031 db_pool.connection().as_ref(),
1032 db_exec.clone(),
1033 exec_config.clone(),
1034 worker,
1035 tick_fn,
1036 )
1037 .await;
1038 assert_eq!(3, execution_log.events.len());
1039 {
1040 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1041 &execution_log.events.get(2).unwrap(),
1042 ExecutionEvent {
1043 event: ExecutionEventInner::TemporarilyFailed {
1044 reason_inner,
1045 reason_full,
1046 detail,
1047 backoff_expires_at,
1048 http_client_traces: None,
1049 },
1050 created_at: at,
1051 backtrace_id: None,
1052 }
1053 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1054 );
1055 assert_eq!(expected_reason, reason_inner.deref());
1056 assert_eq!(
1057 format!("activity trap: {expected_reason}"),
1058 reason_full.deref()
1059 );
1060 assert_eq!(Some(expected_detail), detail.as_deref());
1061 assert_eq!(at, sim_clock.now());
1062 assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1063 }
1064 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1065 std::sync::Mutex::new(IndexMap::from([(
1066 Version::new(4),
1067 (
1068 vec![],
1069 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1070 ),
1071 )])),
1072 )));
1073 assert!(
1075 tick_fn(
1076 exec_config.clone(),
1077 sim_clock.clone(),
1078 db_exec.clone(),
1079 worker.clone(),
1080 sim_clock.now(),
1081 )
1082 .await
1083 .is_empty()
1084 );
1085 sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1087 tick_fn(
1088 exec_config,
1089 sim_clock.clone(),
1090 db_exec.clone(),
1091 worker,
1092 sim_clock.now(),
1093 )
1094 .await;
1095 let execution_log = {
1096 let db_connection = db_pool.connection();
1097 db_connection
1098 .get(&execution_log.execution_id)
1099 .await
1100 .unwrap()
1101 };
1102 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1103 assert_matches!(
1104 execution_log.events.get(3).unwrap(),
1105 ExecutionEvent {
1106 event: ExecutionEventInner::Locked { .. },
1107 created_at: at,
1108 backtrace_id: None,
1109 } if *at == sim_clock.now()
1110 );
1111 assert_matches!(
1112 execution_log.events.get(4).unwrap(),
1113 ExecutionEvent {
1114 event: ExecutionEventInner::Finished {
1115 result: SupportedFunctionReturnValue::Ok{ok:None},
1116 http_client_traces: None
1117 },
1118 created_at: finished_at,
1119 backtrace_id: None,
1120 } if *finished_at == sim_clock.now()
1121 );
1122 db_close.close().await;
1123 }
1124
1125 #[tokio::test]
1126 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1127 set_up();
1128 let created_at = Now.now();
1129 let clock_fn = ConstClock(created_at);
1130 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1131 let exec_config = ExecConfig {
1132 batch_size: 1,
1133 lock_expiry: Duration::from_secs(1),
1134 tick_sleep: Duration::ZERO,
1135 component_id: ComponentId::dummy_activity(),
1136 task_limiter: None,
1137 executor_id: ExecutorId::generate(),
1138 retry_config: ComponentRetryConfig::ZERO,
1139 };
1140
1141 let expected_reason = "error reason";
1142 let expected_detail = "error detail";
1143 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1144 WorkerError::ActivityTrap {
1145 reason: expected_reason.to_string(),
1146 trap_kind: concepts::TrapKind::Trap,
1147 detail: Some(expected_detail.to_string()),
1148 version: Version::new(2),
1149 http_client_traces: None,
1150 },
1151 )));
1152 let execution_log = create_and_tick(
1153 CreateAndTickConfig {
1154 execution_id: ExecutionId::generate(),
1155 created_at,
1156 executed_at: created_at,
1157 },
1158 clock_fn,
1159 db_pool.connection().as_ref(),
1160 db_exec,
1161 exec_config.clone(),
1162 worker,
1163 tick_fn,
1164 )
1165 .await;
1166 assert_eq!(3, execution_log.events.len());
1167 let (reason, kind, detail) = assert_matches!(
1168 &execution_log.events.get(2).unwrap(),
1169 ExecutionEvent {
1170 event: ExecutionEventInner::Finished{
1171 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1172 http_client_traces: None
1173 },
1174 created_at: at,
1175 backtrace_id: None,
1176 } if *at == created_at
1177 => (reason_inner, kind, detail)
1178 );
1179 assert_eq!(expected_reason, *reason);
1180 assert_eq!(Some(expected_detail), detail.as_deref());
1181 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1182
1183 db_close.close().await;
1184 }
1185
1186 #[tokio::test]
1187 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1188 let worker_error = WorkerError::ActivityTrap {
1189 reason: "error reason".to_string(),
1190 trap_kind: TrapKind::Trap,
1191 detail: Some("detail".to_string()),
1192 version: Version::new(2),
1193 http_client_traces: None,
1194 };
1195 let expected_child_err = FinishedExecutionError::PermanentFailure {
1196 reason_full: "activity trap: error reason".to_string(),
1197 reason_inner: "error reason".to_string(),
1198 kind: PermanentFailureKind::ActivityTrap,
1199 detail: Some("detail".to_string()),
1200 };
1201 child_execution_permanently_failed_should_notify_parent(
1202 WorkerResult::Err(worker_error),
1203 expected_child_err,
1204 )
1205 .await;
1206 }
1207
1208 #[tokio::test]
1211 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1212 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1213 child_execution_permanently_failed_should_notify_parent(
1214 WorkerResult::DbUpdatedByWorkerOrWatcher,
1215 expected_child_err,
1216 )
1217 .await;
1218 }
1219
1220 async fn child_execution_permanently_failed_should_notify_parent(
1221 worker_result: WorkerResult,
1222 expected_child_err: FinishedExecutionError,
1223 ) {
1224 use concepts::storage::JoinSetResponseEventOuter;
1225 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1226
1227 set_up();
1228 let sim_clock = SimClock::default();
1229 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1230
1231 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1232 WorkerResult::DbUpdatedByWorkerOrWatcher,
1233 ));
1234 let parent_execution_id = ExecutionId::generate();
1235 db_pool
1236 .connection()
1237 .create(CreateRequest {
1238 created_at: sim_clock.now(),
1239 execution_id: parent_execution_id.clone(),
1240 ffqn: FFQN_SOME,
1241 params: Params::empty(),
1242 parent: None,
1243 metadata: concepts::ExecutionMetadata::empty(),
1244 scheduled_at: sim_clock.now(),
1245 component_id: ComponentId::dummy_activity(),
1246 scheduled_by: None,
1247 })
1248 .await
1249 .unwrap();
1250 tick_fn(
1251 ExecConfig {
1252 batch_size: 1,
1253 lock_expiry: LOCK_EXPIRY,
1254 tick_sleep: Duration::ZERO,
1255 component_id: ComponentId::dummy_activity(),
1256 task_limiter: None,
1257 executor_id: ExecutorId::generate(),
1258 retry_config: ComponentRetryConfig::ZERO,
1259 },
1260 sim_clock.clone(),
1261 db_exec.clone(),
1262 parent_worker,
1263 sim_clock.now(),
1264 )
1265 .await;
1266
1267 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1268 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1269 {
1271 let params = Params::empty();
1272 let child = CreateRequest {
1273 created_at: sim_clock.now(),
1274 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1275 ffqn: FFQN_CHILD,
1276 params: params.clone(),
1277 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1278 metadata: concepts::ExecutionMetadata::empty(),
1279 scheduled_at: sim_clock.now(),
1280 component_id: ComponentId::dummy_activity(),
1281 scheduled_by: None,
1282 };
1283 let current_time = sim_clock.now();
1284 let join_set = AppendRequest {
1285 created_at: current_time,
1286 event: ExecutionEventInner::HistoryEvent {
1287 event: HistoryEvent::JoinSetCreate {
1288 join_set_id: join_set_id.clone(),
1289 closing_strategy: ClosingStrategy::Complete,
1290 },
1291 },
1292 };
1293 let child_exec_req = AppendRequest {
1294 created_at: current_time,
1295 event: ExecutionEventInner::HistoryEvent {
1296 event: HistoryEvent::JoinSetRequest {
1297 join_set_id: join_set_id.clone(),
1298 request: JoinSetRequest::ChildExecutionRequest {
1299 child_execution_id: child_execution_id.clone(),
1300 target_ffqn: FFQN_CHILD,
1301 params,
1302 },
1303 },
1304 },
1305 };
1306 let join_next = AppendRequest {
1307 created_at: current_time,
1308 event: ExecutionEventInner::HistoryEvent {
1309 event: HistoryEvent::JoinNext {
1310 join_set_id: join_set_id.clone(),
1311 run_expires_at: sim_clock.now(),
1312 closing: false,
1313 requested_ffqn: Some(FFQN_CHILD),
1314 },
1315 },
1316 };
1317 db_pool
1318 .connection()
1319 .append_batch_create_new_execution(
1320 current_time,
1321 vec![join_set, child_exec_req, join_next],
1322 parent_execution_id.clone(),
1323 Version::new(2),
1324 vec![child],
1325 )
1326 .await
1327 .unwrap();
1328 }
1329
1330 let child_worker =
1331 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1332
1333 tick_fn(
1335 ExecConfig {
1336 batch_size: 1,
1337 lock_expiry: LOCK_EXPIRY,
1338 tick_sleep: Duration::ZERO,
1339 component_id: ComponentId::dummy_activity(),
1340 task_limiter: None,
1341 executor_id: ExecutorId::generate(),
1342 retry_config: ComponentRetryConfig::ZERO,
1343 },
1344 sim_clock.clone(),
1345 db_exec.clone(),
1346 child_worker,
1347 sim_clock.now(),
1348 )
1349 .await;
1350 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1351 sim_clock.move_time_forward(LOCK_EXPIRY);
1353 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1354 .await
1355 .unwrap();
1356 }
1357 let child_log = db_pool
1358 .connection()
1359 .get(&ExecutionId::Derived(child_execution_id.clone()))
1360 .await
1361 .unwrap();
1362 assert!(child_log.pending_state.is_finished());
1363 assert_eq!(
1364 Version(2),
1365 child_log.next_version,
1366 "created = 0, locked = 1, with_single_result = 2"
1367 );
1368 assert_eq!(
1369 ExecutionEventInner::Finished {
1370 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1371 http_client_traces: None
1372 },
1373 child_log.last_event().event
1374 );
1375 let parent_log = db_pool
1376 .connection()
1377 .get(&parent_execution_id)
1378 .await
1379 .unwrap();
1380 assert_matches!(
1381 parent_log.pending_state,
1382 PendingState::PendingAt {
1383 scheduled_at
1384 } if scheduled_at == sim_clock.now(),
1385 "parent should be back to pending"
1386 );
1387 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1388 parent_log.responses.last(),
1389 Some(JoinSetResponseEventOuter{
1390 created_at: at,
1391 event: JoinSetResponseEvent{
1392 join_set_id: found_join_set_id,
1393 event: JoinSetResponse::ChildExecutionFinished {
1394 child_execution_id: found_child_execution_id,
1395 finished_version,
1396 result: found_result,
1397 }
1398 }
1399 })
1400 if *at == sim_clock.now()
1401 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1402 );
1403 assert_eq!(join_set_id, *found_join_set_id);
1404 assert_eq!(child_execution_id, *found_child_execution_id);
1405 assert_eq!(child_log.next_version, *child_finished_version);
1406 assert_matches!(
1407 found_result,
1408 SupportedFunctionReturnValue::ExecutionError(_)
1409 );
1410
1411 db_close.close().await;
1412 }
1413
1414 #[derive(Clone, Debug)]
1415 struct SleepyWorker {
1416 duration: Duration,
1417 result: SupportedFunctionReturnValue,
1418 exported: [FunctionMetadata; 1],
1419 }
1420
1421 #[async_trait]
1422 impl Worker for SleepyWorker {
1423 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1424 tokio::time::sleep(self.duration).await;
1425 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1426 }
1427
1428 fn exported_functions(&self) -> &[FunctionMetadata] {
1429 &self.exported
1430 }
1431 }
1432
1433 #[tokio::test]
1434 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1435 set_up();
1436 let sim_clock = SimClock::default();
1437 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1438 let lock_expiry = Duration::from_millis(100);
1439 let timeout_duration = Duration::from_millis(300);
1440 let retry_config = ComponentRetryConfig {
1441 max_retries: 1,
1442 retry_exp_backoff: timeout_duration,
1443 };
1444 let exec_config = ExecConfig {
1445 batch_size: 1,
1446 lock_expiry,
1447 tick_sleep: Duration::ZERO,
1448 component_id: ComponentId::dummy_activity(),
1449 task_limiter: None,
1450 executor_id: ExecutorId::generate(),
1451 retry_config,
1452 };
1453
1454 let worker = Arc::new(SleepyWorker {
1455 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1457 exported: [FunctionMetadata {
1458 ffqn: FFQN_SOME,
1459 parameter_types: ParameterTypes::default(),
1460 return_type: RETURN_TYPE_DUMMY,
1461 extension: None,
1462 submittable: true,
1463 }],
1464 });
1465 let execution_id = ExecutionId::generate();
1467 let db_connection = db_pool.connection();
1468 db_connection
1469 .create(CreateRequest {
1470 created_at: sim_clock.now(),
1471 execution_id: execution_id.clone(),
1472 ffqn: FFQN_SOME,
1473 params: Params::empty(),
1474 parent: None,
1475 metadata: concepts::ExecutionMetadata::empty(),
1476 scheduled_at: sim_clock.now(),
1477 component_id: ComponentId::dummy_activity(),
1478 scheduled_by: None,
1479 })
1480 .await
1481 .unwrap();
1482
1483 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1484 let executor = ExecTask::new_test(
1485 worker,
1486 exec_config,
1487 sim_clock.clone(),
1488 db_exec.clone(),
1489 ffqns,
1490 );
1491 let mut first_execution_progress = executor
1492 .tick(sim_clock.now(), RunId::generate())
1493 .await
1494 .unwrap();
1495 assert_eq!(1, first_execution_progress.executions.len());
1496 sim_clock.move_time_forward(lock_expiry);
1498 let now_after_first_lock_expiry = sim_clock.now();
1500 {
1501 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1502 let cleanup_progress = executor
1503 .tick(now_after_first_lock_expiry, RunId::generate())
1504 .await
1505 .unwrap();
1506 assert!(cleanup_progress.executions.is_empty());
1507 }
1508 {
1509 let expired_locks = expired_timers_watcher::tick(
1510 db_pool.connection().as_ref(),
1511 now_after_first_lock_expiry,
1512 )
1513 .await
1514 .unwrap()
1515 .expired_locks;
1516 assert_eq!(1, expired_locks);
1517 }
1518 assert!(
1519 !first_execution_progress
1520 .executions
1521 .pop()
1522 .unwrap()
1523 .1
1524 .is_finished()
1525 );
1526
1527 let execution_log = db_connection.get(&execution_id).await.unwrap();
1528 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1529 assert_matches!(
1530 &execution_log.events.get(2).unwrap(),
1531 ExecutionEvent {
1532 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1533 created_at: at,
1534 backtrace_id: None,
1535 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1536 );
1537 assert_eq!(
1538 PendingState::PendingAt {
1539 scheduled_at: expected_first_timeout_expiry
1540 },
1541 execution_log.pending_state
1542 );
1543 sim_clock.move_time_forward(timeout_duration);
1544 let now_after_first_timeout = sim_clock.now();
1545 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1546
1547 let mut second_execution_progress = executor
1548 .tick(now_after_first_timeout, RunId::generate())
1549 .await
1550 .unwrap();
1551 assert_eq!(1, second_execution_progress.executions.len());
1552
1553 sim_clock.move_time_forward(lock_expiry);
1555 let now_after_second_lock_expiry = sim_clock.now();
1557 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1558 {
1559 let cleanup_progress = executor
1560 .tick(now_after_second_lock_expiry, RunId::generate())
1561 .await
1562 .unwrap();
1563 assert!(cleanup_progress.executions.is_empty());
1564 }
1565 {
1566 let expired_locks = expired_timers_watcher::tick(
1567 db_pool.connection().as_ref(),
1568 now_after_second_lock_expiry,
1569 )
1570 .await
1571 .unwrap()
1572 .expired_locks;
1573 assert_eq!(1, expired_locks);
1574 }
1575 assert!(
1576 !second_execution_progress
1577 .executions
1578 .pop()
1579 .unwrap()
1580 .1
1581 .is_finished()
1582 );
1583
1584 drop(db_connection);
1585 drop(executor);
1586 db_close.close().await;
1587 }
1588}