1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7 LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13 FinishedExecutionError,
14 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30 pub lock_expiry: Duration,
31 pub tick_sleep: Duration,
32 pub batch_size: u32,
33 pub component_id: ComponentId,
34 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35 pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39 worker: Arc<dyn Worker>,
40 config: ExecConfig,
41 clock_fn: C, db_pool: Arc<dyn DbPool>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!("Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: Arc<dyn DbPool>,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 clock_fn,
123 db_pool,
124 ffqns,
125 }
126 }
127
128 pub fn spawn_new(
129 worker: Arc<dyn Worker>,
130 config: ExecConfig,
131 clock_fn: C,
132 db_pool: Arc<dyn DbPool>,
133 ) -> ExecutorTaskHandle {
134 let is_closing = Arc::new(AtomicBool::default());
135 let is_closing_inner = is_closing.clone();
136 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137 let component_id = config.component_id.clone();
138 let executor_id = config.executor_id;
139 let abort_handle = tokio::spawn(async move {
140 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141 let task = Self {
142 worker,
143 config,
144 db_pool,
145 ffqns: ffqns.clone(),
146 clock_fn: clock_fn.clone(),
147 };
148 loop {
149 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150 let executed_at = clock_fn.now();
151 task.db_pool
152 .connection()
153 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154 .await;
155 if is_closing_inner.load(Ordering::Relaxed) {
156 return;
157 }
158 }
159 })
160 .abort_handle();
161 ExecutorTaskHandle {
162 is_closing,
163 abort_handle,
164 component_id,
165 executor_id,
166 }
167 }
168
169 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170 if let Some(task_limiter) = &self.config.task_limiter {
171 let mut locks = Vec::new();
172 for _ in 0..self.config.batch_size {
173 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174 locks.push(Some(permit));
175 } else {
176 break;
177 }
178 }
179 locks
180 } else {
181 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182 for _ in 0..self.config.batch_size {
183 vec.push(None);
184 }
185 vec
186 }
187 }
188
189 #[cfg(feature = "test")]
190 pub async fn tick_test(
191 &self,
192 executed_at: DateTime<Utc>,
193 run_id: RunId,
194 ) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at, run_id).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199 async fn tick(
200 &self,
201 executed_at: DateTime<Utc>,
202 run_id: RunId,
203 ) -> Result<ExecutionProgress, ()> {
204 let locked_executions = {
205 let mut permits = self.acquire_task_permits();
206 if permits.is_empty() {
207 return Ok(ExecutionProgress::default());
208 }
209 let db_connection = self.db_pool.connection();
210 let lock_expires_at = executed_at + self.config.lock_expiry;
211 let locked_executions = db_connection
212 .lock_pending(
213 permits.len(), executed_at, self.ffqns.clone(),
216 executed_at, self.config.component_id.clone(),
218 self.config.executor_id,
219 lock_expires_at,
220 run_id,
221 )
222 .await
223 .map_err(|err| {
224 warn!("lock_pending error {err:?}");
225 })?;
226 while permits.len() > locked_executions.len() {
228 permits.pop();
229 }
230 assert_eq!(permits.len(), locked_executions.len());
231 locked_executions.into_iter().zip(permits)
232 };
233 let execution_deadline = executed_at + self.config.lock_expiry;
234
235 let mut executions = Vec::with_capacity(locked_executions.len());
236 for (locked_execution, permit) in locked_executions {
237 let execution_id = locked_execution.execution_id.clone();
238 let join_handle = {
239 let worker = self.worker.clone();
240 let db_pool = self.db_pool.clone();
241 let clock_fn = self.clock_fn.clone();
242 let run_id = locked_execution.run_id;
243 let worker_span = info_span!(parent: None, "worker",
244 "otel.name" = format!("worker {}", locked_execution.ffqn),
245 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246 locked_execution.metadata.enrich(&worker_span);
247 tokio::spawn({
248 let worker_span2 = worker_span.clone();
249 async move {
250 let res = Self::run_worker(
251 worker,
252 db_pool.as_ref(),
253 execution_deadline,
254 clock_fn,
255 locked_execution,
256 worker_span2,
257 )
258 .await;
259 if let Err(db_error) = res {
260 error!("Execution will be timed out not writing `{db_error:?}`");
261 }
262 drop(permit);
263 }
264 .instrument(worker_span)
265 })
266 };
267 executions.push((execution_id, join_handle));
268 }
269 Ok(ExecutionProgress { executions })
270 }
271
272 async fn run_worker(
273 worker: Arc<dyn Worker>,
274 db_pool: &dyn DbPool,
275 execution_deadline: DateTime<Utc>,
276 clock_fn: C,
277 locked_execution: LockedExecution,
278 worker_span: Span,
279 ) -> Result<(), DbError> {
280 debug!("Worker::run starting");
281 trace!(
282 version = %locked_execution.version,
283 params = ?locked_execution.params,
284 event_history = ?locked_execution.event_history,
285 "Worker::run starting"
286 );
287 let can_be_retried = ExecutionLog::can_be_retried_after(
288 locked_execution.temporary_event_count + 1,
289 locked_execution.max_retries,
290 locked_execution.retry_exp_backoff,
291 );
292 let unlock_expiry_on_limit_reached =
293 ExecutionLog::compute_retry_duration_when_retrying_forever(
294 locked_execution.temporary_event_count + 1,
295 locked_execution.retry_exp_backoff,
296 );
297 let ctx = WorkerContext {
298 execution_id: locked_execution.execution_id.clone(),
299 metadata: locked_execution.metadata,
300 ffqn: locked_execution.ffqn,
301 params: locked_execution.params,
302 event_history: locked_execution.event_history,
303 responses: locked_execution
304 .responses
305 .into_iter()
306 .map(|outer| outer.event)
307 .collect(),
308 version: locked_execution.version,
309 execution_deadline,
310 can_be_retried: can_be_retried.is_some(),
311 run_id: locked_execution.run_id,
312 worker_span,
313 };
314 let worker_result = worker.run(ctx).await;
315 trace!(?worker_result, "Worker::run finished");
316 let result_obtained_at = clock_fn.now();
317 match Self::worker_result_to_execution_event(
318 locked_execution.execution_id,
319 worker_result,
320 result_obtained_at,
321 locked_execution.parent,
322 can_be_retried,
323 unlock_expiry_on_limit_reached,
324 )? {
325 Some(append) => {
326 let db_connection = db_pool.connection();
327 trace!("Appending {append:?}");
328 append.append(db_connection.as_ref()).await
329 }
330 None => Ok(()),
331 }
332 }
333
334 fn worker_result_to_execution_event(
337 execution_id: ExecutionId,
338 worker_result: WorkerResult,
339 result_obtained_at: DateTime<Utc>,
340 parent: Option<(ExecutionId, JoinSetId)>,
341 can_be_retried: Option<Duration>,
342 unlock_expiry_on_limit_reached: Duration,
343 ) -> Result<Option<Append>, DbError> {
344 Ok(match worker_result {
345 WorkerResult::Ok(result, new_version, http_client_traces) => {
346 info!(
347 "Execution finished: {}",
348 result.as_pending_state_finished_result()
349 );
350 let child_finished =
351 parent.map(
352 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
353 parent_execution_id,
354 parent_join_set,
355 result: Ok(result.clone()),
356 },
357 );
358 let primary_event = AppendRequest {
359 created_at: result_obtained_at,
360 event: ExecutionEventInner::Finished {
361 result: Ok(result),
362 http_client_traces,
363 },
364 };
365
366 Some(Append {
367 created_at: result_obtained_at,
368 primary_event,
369 execution_id,
370 version: new_version,
371 child_finished,
372 })
373 }
374 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
375 WorkerResult::Err(err) => {
376 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
378 WorkerError::TemporaryTimeout {
379 http_client_traces,
380 version,
381 } => {
382 if let Some(duration) = can_be_retried {
383 let backoff_expires_at = result_obtained_at + duration;
384 info!(
385 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
386 );
387 (
388 ExecutionEventInner::TemporarilyTimedOut {
389 backoff_expires_at,
390 http_client_traces,
391 },
392 None,
393 version,
394 )
395 } else {
396 info!("Permanent timeout");
397 let result = Err(FinishedExecutionError::PermanentTimeout);
398 let child_finished =
399 parent.map(|(parent_execution_id, parent_join_set)| {
400 ChildFinishedResponse {
401 parent_execution_id,
402 parent_join_set,
403 result: result.clone(),
404 }
405 });
406 (
407 ExecutionEventInner::Finished {
408 result,
409 http_client_traces,
410 },
411 child_finished,
412 version,
413 )
414 }
415 }
416 WorkerError::DbError(db_error) => {
417 return Err(db_error);
418 }
419 WorkerError::ActivityTrap {
420 reason: reason_inner,
421 trap_kind,
422 detail,
423 version,
424 http_client_traces,
425 } => retry_or_fail(
426 result_obtained_at,
427 parent,
428 can_be_retried,
429 reason_full,
430 reason_inner,
431 trap_kind, Some(detail),
433 version,
434 http_client_traces,
435 ),
436 WorkerError::ActivityPreopenedDirError {
437 reason_kind,
438 reason_inner,
439 version,
440 } => retry_or_fail(
441 result_obtained_at,
442 parent,
443 can_be_retried,
444 reason_full,
445 reason_inner,
446 reason_kind, None, version,
449 None, ),
451 WorkerError::ActivityReturnedError {
452 detail,
453 version,
454 http_client_traces,
455 } => {
456 let duration = can_be_retried.expect(
457 "ActivityReturnedError must not be returned when retries are exhausted",
458 );
459 let expires_at = result_obtained_at + duration;
460 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
461 (
462 ExecutionEventInner::TemporarilyFailed {
463 backoff_expires_at: expires_at,
464 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
466 detail, http_client_traces,
468 },
469 None,
470 version,
471 )
472 }
473 WorkerError::TemporaryWorkflowTrap {
474 reason: reason_inner,
475 kind,
476 detail,
477 version,
478 } => {
479 let duration = can_be_retried.expect("workflows are retried forever");
480 let expires_at = result_obtained_at + duration;
481 debug!(
482 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
483 );
484 (
485 ExecutionEventInner::TemporarilyFailed {
486 backoff_expires_at: expires_at,
487 reason_full: StrVariant::from(reason_full),
488 reason_inner: StrVariant::from(reason_inner),
489 detail,
490 http_client_traces: None,
491 },
492 None,
493 version,
494 )
495 }
496 WorkerError::LimitReached {
497 reason: inner_reason,
498 version: new_version,
499 } => {
500 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
501 warn!(
502 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
503 );
504 (
505 ExecutionEventInner::Unlocked {
506 backoff_expires_at: expires_at,
507 reason: StrVariant::from(reason_full),
508 },
509 None,
510 new_version,
511 )
512 }
513 WorkerError::FatalError(fatal_error, version) => {
514 info!("Fatal worker error - {fatal_error:?}");
515 let result = Err(FinishedExecutionError::from(fatal_error));
516 let child_finished =
517 parent.map(|(parent_execution_id, parent_join_set)| {
518 ChildFinishedResponse {
519 parent_execution_id,
520 parent_join_set,
521 result: result.clone(),
522 }
523 });
524 (
525 ExecutionEventInner::Finished {
526 result,
527 http_client_traces: None,
528 },
529 child_finished,
530 version,
531 )
532 }
533 };
534 Some(Append {
535 created_at: result_obtained_at,
536 primary_event: AppendRequest {
537 created_at: result_obtained_at,
538 event: primary_event,
539 },
540 execution_id,
541 version,
542 child_finished,
543 })
544 }
545 })
546 }
547}
548
549#[expect(clippy::too_many_arguments)]
550fn retry_or_fail(
551 result_obtained_at: DateTime<Utc>,
552 parent: Option<(ExecutionId, JoinSetId)>,
553 can_be_retried: Option<Duration>,
554 reason_full: String,
555 reason_inner: String,
556 err_kind_for_logs: impl Display,
557 detail: Option<String>,
558 version: Version,
559 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
560) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
561 if let Some(duration) = can_be_retried {
562 let expires_at = result_obtained_at + duration;
563 debug!(
564 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
565 );
566 (
567 ExecutionEventInner::TemporarilyFailed {
568 backoff_expires_at: expires_at,
569 reason_full: StrVariant::from(reason_full),
570 reason_inner: StrVariant::from(reason_inner),
571 detail,
572 http_client_traces,
573 },
574 None,
575 version,
576 )
577 } else {
578 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
579 let result = Err(FinishedExecutionError::PermanentFailure {
580 reason_inner,
581 reason_full,
582 kind: PermanentFailureKind::ActivityTrap,
583 detail,
584 });
585 let child_finished =
586 parent.map(
587 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
588 parent_execution_id,
589 parent_join_set,
590 result: result.clone(),
591 },
592 );
593 (
594 ExecutionEventInner::Finished {
595 result,
596 http_client_traces,
597 },
598 child_finished,
599 version,
600 )
601 }
602}
603
604#[derive(Debug, Clone)]
605pub(crate) struct ChildFinishedResponse {
606 pub(crate) parent_execution_id: ExecutionId,
607 pub(crate) parent_join_set: JoinSetId,
608 pub(crate) result: FinishedExecutionResult,
609}
610
611#[derive(Debug, Clone)]
612pub(crate) struct Append {
613 pub(crate) created_at: DateTime<Utc>,
614 pub(crate) primary_event: AppendRequest,
615 pub(crate) execution_id: ExecutionId,
616 pub(crate) version: Version,
617 pub(crate) child_finished: Option<ChildFinishedResponse>,
618}
619
620impl Append {
621 pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
622 if let Some(child_finished) = self.child_finished {
623 assert_matches!(
624 &self.primary_event,
625 AppendRequest {
626 event: ExecutionEventInner::Finished { .. },
627 ..
628 }
629 );
630 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
631 db_connection
632 .append_batch_respond_to_parent(
633 derived.clone(),
634 self.created_at,
635 vec![self.primary_event],
636 self.version.clone(),
637 child_finished.parent_execution_id,
638 JoinSetResponseEventOuter {
639 created_at: self.created_at,
640 event: JoinSetResponseEvent {
641 join_set_id: child_finished.parent_join_set,
642 event: JoinSetResponse::ChildExecutionFinished {
643 child_execution_id: derived,
644 finished_version: self.version,
646 result: child_finished.result,
647 },
648 },
649 },
650 )
651 .await?;
652 } else {
653 db_connection
654 .append_batch(
655 self.created_at,
656 vec![self.primary_event],
657 self.execution_id,
658 self.version,
659 )
660 .await?;
661 }
662 Ok(())
663 }
664}
665
666#[cfg(any(test, feature = "test"))]
667pub mod simple_worker {
668 use crate::worker::{Worker, WorkerContext, WorkerResult};
669 use async_trait::async_trait;
670 use concepts::{
671 FunctionFqn, FunctionMetadata, ParameterTypes,
672 storage::{HistoryEvent, Version},
673 };
674 use indexmap::IndexMap;
675 use std::sync::Arc;
676 use tracing::trace;
677
678 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
679 pub type SimpleWorkerResultMap =
680 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
681
682 #[derive(Clone, Debug)]
683 pub struct SimpleWorker {
684 pub worker_results_rev: SimpleWorkerResultMap,
685 pub ffqn: FunctionFqn,
686 exported: [FunctionMetadata; 1],
687 }
688
689 impl SimpleWorker {
690 #[must_use]
691 pub fn with_single_result(res: WorkerResult) -> Self {
692 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
693 Version::new(2),
694 (vec![], res),
695 )]))))
696 }
697
698 #[must_use]
699 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
700 Self {
701 worker_results_rev: self.worker_results_rev,
702 exported: [FunctionMetadata {
703 ffqn: ffqn.clone(),
704 parameter_types: ParameterTypes::default(),
705 return_type: None,
706 extension: None,
707 submittable: true,
708 }],
709 ffqn,
710 }
711 }
712
713 #[must_use]
714 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
715 Self {
716 worker_results_rev,
717 ffqn: FFQN_SOME,
718 exported: [FunctionMetadata {
719 ffqn: FFQN_SOME,
720 parameter_types: ParameterTypes::default(),
721 return_type: None,
722 extension: None,
723 submittable: true,
724 }],
725 }
726 }
727 }
728
729 #[async_trait]
730 impl Worker for SimpleWorker {
731 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
732 let (expected_version, (expected_eh, worker_result)) =
733 self.worker_results_rev.lock().unwrap().pop().unwrap();
734 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
735 assert_eq!(expected_version, ctx.version);
736 assert_eq!(expected_eh, ctx.event_history);
737 worker_result
738 }
739
740 fn exported_functions(&self) -> &[FunctionMetadata] {
741 &self.exported
742 }
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use self::simple_worker::SimpleWorker;
749 use super::*;
750 use crate::worker::FatalError;
751 use crate::{expired_timers_watcher, worker::WorkerResult};
752 use assert_matches::assert_matches;
753 use async_trait::async_trait;
754 use concepts::storage::{CreateRequest, JoinSetRequest};
755 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
756 use concepts::time::Now;
757 use concepts::{
758 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
759 SupportedFunctionReturnValue, TrapKind,
760 };
761 use db_tests::Database;
762 use indexmap::IndexMap;
763 use simple_worker::FFQN_SOME;
764 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
765 use test_utils::set_up;
766 use test_utils::sim_clock::{ConstClock, SimClock};
767
768 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
769
770 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
771 config: ExecConfig,
772 clock_fn: C,
773 db_pool: Arc<dyn DbPool>,
774 worker: Arc<W>,
775 executed_at: DateTime<Utc>,
776 ) -> ExecutionProgress {
777 trace!("Ticking with {worker:?}");
778 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
779 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
780 let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
781 loop {
782 execution_progress
783 .executions
784 .retain(|(_, abort_handle)| !abort_handle.is_finished());
785 if execution_progress.executions.is_empty() {
786 return execution_progress;
787 }
788 tokio::time::sleep(Duration::from_millis(10)).await;
789 }
790 }
791
792 #[tokio::test]
793 async fn execute_simple_lifecycle_tick_based_mem() {
794 let created_at = Now.now();
795 let (_guard, db_pool) = Database::Memory.set_up().await;
796 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
797 db_pool.close().await.unwrap();
798 }
799
800 #[tokio::test]
801 async fn execute_simple_lifecycle_tick_based_sqlite() {
802 let created_at = Now.now();
803 let (_guard, db_pool) = Database::Sqlite.set_up().await;
804 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
805 db_pool.close().await.unwrap();
806 }
807
808 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
809 pool: Arc<dyn DbPool>,
810 clock_fn: C,
811 ) {
812 set_up();
813 let created_at = clock_fn.now();
814 let exec_config = ExecConfig {
815 batch_size: 1,
816 lock_expiry: Duration::from_secs(1),
817 tick_sleep: Duration::from_millis(100),
818 component_id: ComponentId::dummy_activity(),
819 task_limiter: None,
820 executor_id: ExecutorId::generate(),
821 };
822
823 let execution_log = create_and_tick(
824 CreateAndTickConfig {
825 execution_id: ExecutionId::generate(),
826 created_at,
827 max_retries: 0,
828 executed_at: created_at,
829 retry_exp_backoff: Duration::ZERO,
830 },
831 clock_fn,
832 pool,
833 exec_config,
834 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
835 SupportedFunctionReturnValue::None,
836 Version::new(2),
837 None,
838 ))),
839 tick_fn,
840 )
841 .await;
842 assert_matches!(
843 execution_log.events.get(2).unwrap(),
844 ExecutionEvent {
845 event: ExecutionEventInner::Finished {
846 result: Ok(SupportedFunctionReturnValue::None),
847 http_client_traces: None
848 },
849 created_at: _,
850 backtrace_id: None,
851 }
852 );
853 }
854
855 #[tokio::test]
856 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
857 set_up();
858 let created_at = Now.now();
859 let clock_fn = ConstClock(created_at);
860 let (_guard, db_pool) = Database::Memory.set_up().await;
861 let exec_config = ExecConfig {
862 batch_size: 1,
863 lock_expiry: Duration::from_secs(1),
864 tick_sleep: Duration::ZERO,
865 component_id: ComponentId::dummy_activity(),
866 task_limiter: None,
867 executor_id: ExecutorId::generate(),
868 };
869
870 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
871 SupportedFunctionReturnValue::None,
872 Version::new(2),
873 None,
874 )));
875 let exec_task = ExecTask::spawn_new(
876 worker.clone(),
877 exec_config.clone(),
878 clock_fn,
879 db_pool.clone(),
880 );
881
882 let execution_log = create_and_tick(
883 CreateAndTickConfig {
884 execution_id: ExecutionId::generate(),
885 created_at,
886 max_retries: 0,
887 executed_at: created_at,
888 retry_exp_backoff: Duration::ZERO,
889 },
890 clock_fn,
891 db_pool.clone(),
892 exec_config,
893 worker,
894 |_, _, _, _, _| async {
895 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
897 },
898 )
899 .await;
900 exec_task.close().await;
901 db_pool.close().await.unwrap();
902 assert_matches!(
903 execution_log.events.get(2).unwrap(),
904 ExecutionEvent {
905 event: ExecutionEventInner::Finished {
906 result: Ok(SupportedFunctionReturnValue::None),
907 http_client_traces: None
908 },
909 created_at: _,
910 backtrace_id: None,
911 }
912 );
913 }
914
915 struct CreateAndTickConfig {
916 execution_id: ExecutionId,
917 created_at: DateTime<Utc>,
918 max_retries: u32,
919 executed_at: DateTime<Utc>,
920 retry_exp_backoff: Duration,
921 }
922
923 async fn create_and_tick<
924 W: Worker,
925 C: ClockFn,
926 T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
927 F: Future<Output = ExecutionProgress>,
928 >(
929 config: CreateAndTickConfig,
930 clock_fn: C,
931 db_pool: Arc<dyn DbPool>,
932 exec_config: ExecConfig,
933 worker: Arc<W>,
934 mut tick: T,
935 ) -> ExecutionLog {
936 let db_connection = db_pool.connection();
938 db_connection
939 .create(CreateRequest {
940 created_at: config.created_at,
941 execution_id: config.execution_id.clone(),
942 ffqn: FFQN_SOME,
943 params: Params::empty(),
944 parent: None,
945 metadata: concepts::ExecutionMetadata::empty(),
946 scheduled_at: config.created_at,
947 retry_exp_backoff: config.retry_exp_backoff,
948 max_retries: config.max_retries,
949 component_id: ComponentId::dummy_activity(),
950 scheduled_by: None,
951 })
952 .await
953 .unwrap();
954 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
956 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
957 debug!("Execution history after tick: {execution_log:?}");
958 assert_matches!(
960 execution_log.events.first().unwrap(),
961 ExecutionEvent {
962 event: ExecutionEventInner::Created { .. },
963 created_at: actually_created_at,
964 backtrace_id: None,
965 }
966 if config.created_at == *actually_created_at
967 );
968 let locked_at = assert_matches!(
969 execution_log.events.get(1).unwrap(),
970 ExecutionEvent {
971 event: ExecutionEventInner::Locked { .. },
972 created_at: locked_at,
973 backtrace_id: None,
974 } if config.created_at <= *locked_at
975 => *locked_at
976 );
977 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
978 event: _,
979 created_at: executed_at,
980 backtrace_id: None,
981 } if *executed_at >= locked_at);
982 execution_log
983 }
984
985 #[tokio::test]
986 async fn activity_trap_should_trigger_an_execution_retry() {
987 set_up();
988 let sim_clock = SimClock::default();
989 let (_guard, db_pool) = Database::Memory.set_up().await;
990 let exec_config = ExecConfig {
991 batch_size: 1,
992 lock_expiry: Duration::from_secs(1),
993 tick_sleep: Duration::ZERO,
994 component_id: ComponentId::dummy_activity(),
995 task_limiter: None,
996 executor_id: ExecutorId::generate(),
997 };
998 let expected_reason = "error reason";
999 let expected_detail = "error detail";
1000 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1001 WorkerError::ActivityTrap {
1002 reason: expected_reason.to_string(),
1003 trap_kind: concepts::TrapKind::Trap,
1004 detail: expected_detail.to_string(),
1005 version: Version::new(2),
1006 http_client_traces: None,
1007 },
1008 )));
1009 let retry_exp_backoff = Duration::from_millis(100);
1010 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1011 let execution_log = create_and_tick(
1012 CreateAndTickConfig {
1013 execution_id: ExecutionId::generate(),
1014 created_at: sim_clock.now(),
1015 max_retries: 1,
1016 executed_at: sim_clock.now(),
1017 retry_exp_backoff,
1018 },
1019 sim_clock.clone(),
1020 db_pool.clone(),
1021 exec_config.clone(),
1022 worker,
1023 tick_fn,
1024 )
1025 .await;
1026 assert_eq!(3, execution_log.events.len());
1027 {
1028 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1029 &execution_log.events.get(2).unwrap(),
1030 ExecutionEvent {
1031 event: ExecutionEventInner::TemporarilyFailed {
1032 reason_inner,
1033 reason_full,
1034 detail,
1035 backoff_expires_at,
1036 http_client_traces: None,
1037 },
1038 created_at: at,
1039 backtrace_id: None,
1040 }
1041 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1042 );
1043 assert_eq!(expected_reason, reason_inner.deref());
1044 assert_eq!(
1045 format!("activity trap: {expected_reason}"),
1046 reason_full.deref()
1047 );
1048 assert_eq!(Some(expected_detail), detail.as_deref());
1049 assert_eq!(at, sim_clock.now());
1050 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1051 }
1052 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1053 std::sync::Mutex::new(IndexMap::from([(
1054 Version::new(4),
1055 (
1056 vec![],
1057 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1058 ),
1059 )])),
1060 )));
1061 assert!(
1063 tick_fn(
1064 exec_config.clone(),
1065 sim_clock.clone(),
1066 db_pool.clone(),
1067 worker.clone(),
1068 sim_clock.now(),
1069 )
1070 .await
1071 .executions
1072 .is_empty()
1073 );
1074 sim_clock.move_time_forward(retry_exp_backoff);
1076 tick_fn(
1077 exec_config,
1078 sim_clock.clone(),
1079 db_pool.clone(),
1080 worker,
1081 sim_clock.now(),
1082 )
1083 .await;
1084 let execution_log = {
1085 let db_connection = db_pool.connection();
1086 db_connection
1087 .get(&execution_log.execution_id)
1088 .await
1089 .unwrap()
1090 };
1091 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1092 assert_matches!(
1093 execution_log.events.get(3).unwrap(),
1094 ExecutionEvent {
1095 event: ExecutionEventInner::Locked { .. },
1096 created_at: at,
1097 backtrace_id: None,
1098 } if *at == sim_clock.now()
1099 );
1100 assert_matches!(
1101 execution_log.events.get(4).unwrap(),
1102 ExecutionEvent {
1103 event: ExecutionEventInner::Finished {
1104 result: Ok(SupportedFunctionReturnValue::None),
1105 http_client_traces: None
1106 },
1107 created_at: finished_at,
1108 backtrace_id: None,
1109 } if *finished_at == sim_clock.now()
1110 );
1111 db_pool.close().await.unwrap();
1112 }
1113
1114 #[tokio::test]
1115 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1116 set_up();
1117 let created_at = Now.now();
1118 let clock_fn = ConstClock(created_at);
1119 let (_guard, db_pool) = Database::Memory.set_up().await;
1120 let exec_config = ExecConfig {
1121 batch_size: 1,
1122 lock_expiry: Duration::from_secs(1),
1123 tick_sleep: Duration::ZERO,
1124 component_id: ComponentId::dummy_activity(),
1125 task_limiter: None,
1126 executor_id: ExecutorId::generate(),
1127 };
1128
1129 let expected_reason = "error reason";
1130 let expected_detail = "error detail";
1131 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1132 WorkerError::ActivityTrap {
1133 reason: expected_reason.to_string(),
1134 trap_kind: concepts::TrapKind::Trap,
1135 detail: expected_detail.to_string(),
1136 version: Version::new(2),
1137 http_client_traces: None,
1138 },
1139 )));
1140 let execution_log = create_and_tick(
1141 CreateAndTickConfig {
1142 execution_id: ExecutionId::generate(),
1143 created_at,
1144 max_retries: 0,
1145 executed_at: created_at,
1146 retry_exp_backoff: Duration::ZERO,
1147 },
1148 clock_fn,
1149 db_pool.clone(),
1150 exec_config.clone(),
1151 worker,
1152 tick_fn,
1153 )
1154 .await;
1155 assert_eq!(3, execution_log.events.len());
1156 let (reason, kind, detail) = assert_matches!(
1157 &execution_log.events.get(2).unwrap(),
1158 ExecutionEvent {
1159 event: ExecutionEventInner::Finished{
1160 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1161 http_client_traces: None
1162 },
1163 created_at: at,
1164 backtrace_id: None,
1165 } if *at == created_at
1166 => (reason_inner, kind, detail)
1167 );
1168 assert_eq!(expected_reason, *reason);
1169 assert_eq!(Some(expected_detail), detail.as_deref());
1170 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1171
1172 db_pool.close().await.unwrap();
1173 }
1174
1175 #[tokio::test]
1176 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1177 let worker_error = WorkerError::ActivityTrap {
1178 reason: "error reason".to_string(),
1179 trap_kind: TrapKind::Trap,
1180 detail: "detail".to_string(),
1181 version: Version::new(2),
1182 http_client_traces: None,
1183 };
1184 let expected_child_err = FinishedExecutionError::PermanentFailure {
1185 reason_full: "activity trap: error reason".to_string(),
1186 reason_inner: "error reason".to_string(),
1187 kind: PermanentFailureKind::ActivityTrap,
1188 detail: Some("detail".to_string()),
1189 };
1190 child_execution_permanently_failed_should_notify_parent(
1191 WorkerResult::Err(worker_error),
1192 expected_child_err,
1193 )
1194 .await;
1195 }
1196
1197 #[tokio::test]
1200 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1201 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1202 child_execution_permanently_failed_should_notify_parent(
1203 WorkerResult::DbUpdatedByWorkerOrWatcher,
1204 expected_child_err,
1205 )
1206 .await;
1207 }
1208
1209 #[tokio::test]
1210 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1211 let parent_id = ExecutionId::from_parts(1, 1);
1212 let join_set_id_outer =
1213 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1214 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1215 let join_set_id_inner =
1216 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1217 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1218 let worker_error = WorkerError::FatalError(
1219 FatalError::UnhandledChildExecutionError {
1220 child_execution_id: child_execution_id.clone(),
1221 root_cause_id: root_cause_id.clone(),
1222 },
1223 Version::new(2),
1224 );
1225 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1226 child_execution_id,
1227 root_cause_id,
1228 };
1229 child_execution_permanently_failed_should_notify_parent(
1230 WorkerResult::Err(worker_error),
1231 expected_child_err,
1232 )
1233 .await;
1234 }
1235
1236 async fn child_execution_permanently_failed_should_notify_parent(
1237 worker_result: WorkerResult,
1238 expected_child_err: FinishedExecutionError,
1239 ) {
1240 use concepts::storage::JoinSetResponseEventOuter;
1241 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1242
1243 set_up();
1244 let sim_clock = SimClock::default();
1245 let (_guard, db_pool) = Database::Memory.set_up().await;
1246
1247 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1248 WorkerResult::DbUpdatedByWorkerOrWatcher,
1249 ));
1250 let parent_execution_id = ExecutionId::generate();
1251 db_pool
1252 .connection()
1253 .create(CreateRequest {
1254 created_at: sim_clock.now(),
1255 execution_id: parent_execution_id.clone(),
1256 ffqn: FFQN_SOME,
1257 params: Params::empty(),
1258 parent: None,
1259 metadata: concepts::ExecutionMetadata::empty(),
1260 scheduled_at: sim_clock.now(),
1261 retry_exp_backoff: Duration::ZERO,
1262 max_retries: 0,
1263 component_id: ComponentId::dummy_activity(),
1264 scheduled_by: None,
1265 })
1266 .await
1267 .unwrap();
1268 tick_fn(
1269 ExecConfig {
1270 batch_size: 1,
1271 lock_expiry: LOCK_EXPIRY,
1272 tick_sleep: Duration::ZERO,
1273 component_id: ComponentId::dummy_activity(),
1274 task_limiter: None,
1275 executor_id: ExecutorId::generate(),
1276 },
1277 sim_clock.clone(),
1278 db_pool.clone(),
1279 parent_worker,
1280 sim_clock.now(),
1281 )
1282 .await;
1283
1284 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1285 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1286 {
1288 let child = CreateRequest {
1289 created_at: sim_clock.now(),
1290 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1291 ffqn: FFQN_CHILD,
1292 params: Params::empty(),
1293 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1294 metadata: concepts::ExecutionMetadata::empty(),
1295 scheduled_at: sim_clock.now(),
1296 retry_exp_backoff: Duration::ZERO,
1297 max_retries: 0,
1298 component_id: ComponentId::dummy_activity(),
1299 scheduled_by: None,
1300 };
1301 let current_time = sim_clock.now();
1302 let join_set = AppendRequest {
1303 created_at: current_time,
1304 event: ExecutionEventInner::HistoryEvent {
1305 event: HistoryEvent::JoinSetCreate {
1306 join_set_id: join_set_id.clone(),
1307 closing_strategy: ClosingStrategy::Complete,
1308 },
1309 },
1310 };
1311 let child_exec_req = AppendRequest {
1312 created_at: current_time,
1313 event: ExecutionEventInner::HistoryEvent {
1314 event: HistoryEvent::JoinSetRequest {
1315 join_set_id: join_set_id.clone(),
1316 request: JoinSetRequest::ChildExecutionRequest {
1317 child_execution_id: child_execution_id.clone(),
1318 },
1319 },
1320 },
1321 };
1322 let join_next = AppendRequest {
1323 created_at: current_time,
1324 event: ExecutionEventInner::HistoryEvent {
1325 event: HistoryEvent::JoinNext {
1326 join_set_id: join_set_id.clone(),
1327 run_expires_at: sim_clock.now(),
1328 closing: false,
1329 },
1330 },
1331 };
1332 db_pool
1333 .connection()
1334 .append_batch_create_new_execution(
1335 current_time,
1336 vec![join_set, child_exec_req, join_next],
1337 parent_execution_id.clone(),
1338 Version::new(2),
1339 vec![child],
1340 )
1341 .await
1342 .unwrap();
1343 }
1344
1345 let child_worker =
1346 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1347
1348 tick_fn(
1350 ExecConfig {
1351 batch_size: 1,
1352 lock_expiry: LOCK_EXPIRY,
1353 tick_sleep: Duration::ZERO,
1354 component_id: ComponentId::dummy_activity(),
1355 task_limiter: None,
1356 executor_id: ExecutorId::generate(),
1357 },
1358 sim_clock.clone(),
1359 db_pool.clone(),
1360 child_worker,
1361 sim_clock.now(),
1362 )
1363 .await;
1364 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1365 sim_clock.move_time_forward(LOCK_EXPIRY);
1367 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1368 .await
1369 .unwrap();
1370 }
1371 let child_log = db_pool
1372 .connection()
1373 .get(&ExecutionId::Derived(child_execution_id.clone()))
1374 .await
1375 .unwrap();
1376 assert!(child_log.pending_state.is_finished());
1377 assert_eq!(
1378 Version(2),
1379 child_log.next_version,
1380 "created = 0, locked = 1, with_single_result = 2"
1381 );
1382 assert_eq!(
1383 ExecutionEventInner::Finished {
1384 result: Err(expected_child_err),
1385 http_client_traces: None
1386 },
1387 child_log.last_event().event
1388 );
1389 let parent_log = db_pool
1390 .connection()
1391 .get(&parent_execution_id)
1392 .await
1393 .unwrap();
1394 assert_matches!(
1395 parent_log.pending_state,
1396 PendingState::PendingAt {
1397 scheduled_at
1398 } if scheduled_at == sim_clock.now(),
1399 "parent should be back to pending"
1400 );
1401 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1402 parent_log.responses.last(),
1403 Some(JoinSetResponseEventOuter{
1404 created_at: at,
1405 event: JoinSetResponseEvent{
1406 join_set_id: found_join_set_id,
1407 event: JoinSetResponse::ChildExecutionFinished {
1408 child_execution_id: found_child_execution_id,
1409 finished_version,
1410 result: found_result,
1411 }
1412 }
1413 })
1414 if *at == sim_clock.now()
1415 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1416 );
1417 assert_eq!(join_set_id, *found_join_set_id);
1418 assert_eq!(child_execution_id, *found_child_execution_id);
1419 assert_eq!(child_log.next_version, *child_finished_version);
1420 assert!(found_result.is_err());
1421
1422 db_pool.close().await.unwrap();
1423 }
1424
1425 #[derive(Clone, Debug)]
1426 struct SleepyWorker {
1427 duration: Duration,
1428 result: SupportedFunctionReturnValue,
1429 exported: [FunctionMetadata; 1],
1430 }
1431
1432 #[async_trait]
1433 impl Worker for SleepyWorker {
1434 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1435 tokio::time::sleep(self.duration).await;
1436 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1437 }
1438
1439 fn exported_functions(&self) -> &[FunctionMetadata] {
1440 &self.exported
1441 }
1442 }
1443
1444 #[tokio::test]
1445 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1446 set_up();
1447 let sim_clock = SimClock::default();
1448 let (_guard, db_pool) = Database::Memory.set_up().await;
1449 let lock_expiry = Duration::from_millis(100);
1450 let exec_config = ExecConfig {
1451 batch_size: 1,
1452 lock_expiry,
1453 tick_sleep: Duration::ZERO,
1454 component_id: ComponentId::dummy_activity(),
1455 task_limiter: None,
1456 executor_id: ExecutorId::generate(),
1457 };
1458
1459 let worker = Arc::new(SleepyWorker {
1460 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1462 exported: [FunctionMetadata {
1463 ffqn: FFQN_SOME,
1464 parameter_types: ParameterTypes::default(),
1465 return_type: None,
1466 extension: None,
1467 submittable: true,
1468 }],
1469 });
1470 let execution_id = ExecutionId::generate();
1472 let timeout_duration = Duration::from_millis(300);
1473 let db_connection = db_pool.connection();
1474 db_connection
1475 .create(CreateRequest {
1476 created_at: sim_clock.now(),
1477 execution_id: execution_id.clone(),
1478 ffqn: FFQN_SOME,
1479 params: Params::empty(),
1480 parent: None,
1481 metadata: concepts::ExecutionMetadata::empty(),
1482 scheduled_at: sim_clock.now(),
1483 retry_exp_backoff: timeout_duration,
1484 max_retries: 1,
1485 component_id: ComponentId::dummy_activity(),
1486 scheduled_by: None,
1487 })
1488 .await
1489 .unwrap();
1490
1491 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1492 let executor = ExecTask::new(
1493 worker,
1494 exec_config,
1495 sim_clock.clone(),
1496 db_pool.clone(),
1497 ffqns,
1498 );
1499 let mut first_execution_progress = executor
1500 .tick(sim_clock.now(), RunId::generate())
1501 .await
1502 .unwrap();
1503 assert_eq!(1, first_execution_progress.executions.len());
1504 sim_clock.move_time_forward(lock_expiry);
1506 let now_after_first_lock_expiry = sim_clock.now();
1508 {
1509 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1510 let cleanup_progress = executor
1511 .tick(now_after_first_lock_expiry, RunId::generate())
1512 .await
1513 .unwrap();
1514 assert!(cleanup_progress.executions.is_empty());
1515 }
1516 {
1517 let expired_locks = expired_timers_watcher::tick(
1518 db_pool.connection().as_ref(),
1519 now_after_first_lock_expiry,
1520 )
1521 .await
1522 .unwrap()
1523 .expired_locks;
1524 assert_eq!(1, expired_locks);
1525 }
1526 assert!(
1527 !first_execution_progress
1528 .executions
1529 .pop()
1530 .unwrap()
1531 .1
1532 .is_finished()
1533 );
1534
1535 let execution_log = db_connection.get(&execution_id).await.unwrap();
1536 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1537 assert_matches!(
1538 &execution_log.events.get(2).unwrap(),
1539 ExecutionEvent {
1540 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1541 created_at: at,
1542 backtrace_id: None,
1543 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1544 );
1545 assert_eq!(
1546 PendingState::PendingAt {
1547 scheduled_at: expected_first_timeout_expiry
1548 },
1549 execution_log.pending_state
1550 );
1551 sim_clock.move_time_forward(timeout_duration);
1552 let now_after_first_timeout = sim_clock.now();
1553 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1554
1555 let mut second_execution_progress = executor
1556 .tick(now_after_first_timeout, RunId::generate())
1557 .await
1558 .unwrap();
1559 assert_eq!(1, second_execution_progress.executions.len());
1560
1561 sim_clock.move_time_forward(lock_expiry);
1563 let now_after_second_lock_expiry = sim_clock.now();
1565 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1566 {
1567 let cleanup_progress = executor
1568 .tick(now_after_second_lock_expiry, RunId::generate())
1569 .await
1570 .unwrap();
1571 assert!(cleanup_progress.executions.is_empty());
1572 }
1573 {
1574 let expired_locks = expired_timers_watcher::tick(
1575 db_pool.connection().as_ref(),
1576 now_after_second_lock_expiry,
1577 )
1578 .await
1579 .unwrap()
1580 .expired_locks;
1581 assert_eq!(1, expired_locks);
1582 }
1583 assert!(
1584 !second_execution_progress
1585 .executions
1586 .pop()
1587 .unwrap()
1588 .1
1589 .is_finished()
1590 );
1591
1592 drop(db_connection);
1593 drop(executor);
1594 db_pool.close().await.unwrap();
1595 }
1596}