1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12 FinishedExecutionError,
13 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::fmt::Display;
17use std::marker::PhantomData;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30 pub lock_expiry: Duration,
31 pub tick_sleep: Duration,
32 pub batch_size: u32,
33 pub component_id: ComponentId,
34 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35}
36
37pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
38 worker: Arc<dyn Worker>,
39 config: ExecConfig,
40 clock_fn: C, db_pool: P,
42 executor_id: ExecutorId,
43 phantom_data: PhantomData<DB>,
44 ffqns: Arc<[FunctionFqn]>,
45}
46
47#[derive(derive_more::Debug, Default)]
48pub struct ExecutionProgress {
49 #[debug(skip)]
50 #[allow(dead_code)]
51 executions: Vec<(ExecutionId, JoinHandle<()>)>,
52}
53
54impl ExecutionProgress {
55 #[cfg(feature = "test")]
56 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
57 let execs = self.executions.len();
58 for (_, handle) in self.executions {
59 handle.await?;
60 }
61 Ok(execs)
62 }
63}
64
65#[derive(derive_more::Debug)]
66pub struct ExecutorTaskHandle {
67 #[debug(skip)]
68 is_closing: Arc<AtomicBool>,
69 #[debug(skip)]
70 abort_handle: AbortHandle,
71 component_id: ComponentId,
72 executor_id: ExecutorId,
73}
74
75impl ExecutorTaskHandle {
76 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
77 pub async fn close(&self) {
78 trace!("Gracefully closing");
79 self.is_closing.store(true, Ordering::Relaxed);
80 while !self.abort_handle.is_finished() {
81 tokio::time::sleep(Duration::from_millis(1)).await;
82 }
83 debug!("Gracefully closed");
84 }
85}
86
87impl Drop for ExecutorTaskHandle {
88 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
89 fn drop(&mut self) {
90 if self.abort_handle.is_finished() {
91 return;
92 }
93 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
94 self.abort_handle.abort();
95 }
96}
97
98#[cfg(feature = "test")]
99pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
100 extract_exported_ffqns_noext(worker)
101}
102
103fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
104 worker
105 .exported_functions()
106 .iter()
107 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
108 .collect::<Arc<_>>()
109}
110
111impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
112 #[cfg(feature = "test")]
113 pub fn new(
114 worker: Arc<dyn Worker>,
115 config: ExecConfig,
116 clock_fn: C,
117 db_pool: P,
118 ffqns: Arc<[FunctionFqn]>,
119 ) -> Self {
120 Self {
121 worker,
122 config,
123 executor_id: ExecutorId::generate(),
124 db_pool,
125 phantom_data: PhantomData,
126 ffqns,
127 clock_fn,
128 }
129 }
130
131 pub fn spawn_new(
132 worker: Arc<dyn Worker>,
133 config: ExecConfig,
134 clock_fn: C,
135 db_pool: P,
136 executor_id: ExecutorId,
137 ) -> ExecutorTaskHandle {
138 let is_closing = Arc::new(AtomicBool::default());
139 let is_closing_inner = is_closing.clone();
140 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
141 let component_id = config.component_id.clone();
142 let abort_handle = tokio::spawn(async move {
143 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
144 let task = Self {
145 worker,
146 config,
147 executor_id,
148 db_pool,
149 phantom_data: PhantomData,
150 ffqns: ffqns.clone(),
151 clock_fn: clock_fn.clone(),
152 };
153 loop {
154 let _ = task.tick(clock_fn.now()).await;
155 let executed_at = clock_fn.now();
156 task.db_pool
157 .connection()
158 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
159 .await;
160 if is_closing_inner.load(Ordering::Relaxed) {
161 return;
162 }
163 }
164 })
165 .abort_handle();
166 ExecutorTaskHandle {
167 is_closing,
168 abort_handle,
169 component_id,
170 executor_id,
171 }
172 }
173
174 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
175 if let Some(task_limiter) = &self.config.task_limiter {
176 let mut locks = Vec::new();
177 for _ in 0..self.config.batch_size {
178 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
179 locks.push(Some(permit));
180 } else {
181 break;
182 }
183 }
184 locks
185 } else {
186 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
187 for _ in 0..self.config.batch_size {
188 vec.push(None);
189 }
190 vec
191 }
192 }
193
194 #[cfg(feature = "test")]
195 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
196 self.tick(executed_at).await
197 }
198
199 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
200 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
201 let locked_executions = {
202 let mut permits = self.acquire_task_permits();
203 if permits.is_empty() {
204 return Ok(ExecutionProgress::default());
205 }
206 let db_connection = self.db_pool.connection();
207 let lock_expires_at = executed_at + self.config.lock_expiry;
208 let locked_executions = db_connection
209 .lock_pending(
210 permits.len(), executed_at, self.ffqns.clone(),
213 executed_at, self.config.component_id.clone(),
215 self.executor_id,
216 lock_expires_at,
217 )
218 .await
219 .map_err(|err| {
220 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
221 })?;
222 while permits.len() > locked_executions.len() {
224 permits.pop();
225 }
226 assert_eq!(permits.len(), locked_executions.len());
227 locked_executions.into_iter().zip(permits)
228 };
229 let execution_deadline = executed_at + self.config.lock_expiry;
230
231 let mut executions = Vec::with_capacity(locked_executions.len());
232 for (locked_execution, permit) in locked_executions {
233 let execution_id = locked_execution.execution_id.clone();
234 let join_handle = {
235 let worker = self.worker.clone();
236 let db_pool = self.db_pool.clone();
237 let clock_fn = self.clock_fn.clone();
238 let run_id = locked_execution.run_id;
239 let worker_span = info_span!(parent: None, "worker",
240 "otel.name" = format!("worker {}", locked_execution.ffqn),
241 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
242 locked_execution.metadata.enrich(&worker_span);
243 tokio::spawn({
244 let worker_span2 = worker_span.clone();
245 async move {
246 let res = Self::run_worker(
247 worker,
248 &db_pool,
249 execution_deadline,
250 clock_fn,
251 locked_execution,
252 worker_span2,
253 )
254 .await;
255 if let Err(db_error) = res {
256 error!("Execution will be timed out not writing `{db_error:?}`");
257 }
258 drop(permit);
259 }
260 .instrument(worker_span)
261 })
262 };
263 executions.push((execution_id, join_handle));
264 }
265 Ok(ExecutionProgress { executions })
266 }
267
268 async fn run_worker(
269 worker: Arc<dyn Worker>,
270 db_pool: &P,
271 execution_deadline: DateTime<Utc>,
272 clock_fn: C,
273 locked_execution: LockedExecution,
274 worker_span: Span,
275 ) -> Result<(), DbError> {
276 debug!("Worker::run starting");
277 trace!(
278 version = %locked_execution.version,
279 params = ?locked_execution.params,
280 event_history = ?locked_execution.event_history,
281 "Worker::run starting"
282 );
283 let can_be_retried = ExecutionLog::can_be_retried_after(
284 locked_execution.temporary_event_count + 1,
285 locked_execution.max_retries,
286 locked_execution.retry_exp_backoff,
287 );
288 let unlock_expiry_on_limit_reached =
289 ExecutionLog::compute_retry_duration_when_retrying_forever(
290 locked_execution.temporary_event_count + 1,
291 locked_execution.retry_exp_backoff,
292 );
293 let ctx = WorkerContext {
294 execution_id: locked_execution.execution_id.clone(),
295 metadata: locked_execution.metadata,
296 ffqn: locked_execution.ffqn,
297 params: locked_execution.params,
298 event_history: locked_execution.event_history,
299 responses: locked_execution
300 .responses
301 .into_iter()
302 .map(|outer| outer.event)
303 .collect(),
304 version: locked_execution.version,
305 execution_deadline,
306 can_be_retried: can_be_retried.is_some(),
307 run_id: locked_execution.run_id,
308 worker_span,
309 };
310 let worker_result = worker.run(ctx).await;
311 trace!(?worker_result, "Worker::run finished");
312 let result_obtained_at = clock_fn.now();
313 match Self::worker_result_to_execution_event(
314 locked_execution.execution_id,
315 worker_result,
316 result_obtained_at,
317 locked_execution.parent,
318 can_be_retried,
319 unlock_expiry_on_limit_reached,
320 )? {
321 Some(append) => {
322 let db_connection = db_pool.connection();
323 trace!("Appending {append:?}");
324 append.clone().append(&db_connection).await
325 }
326 None => Ok(()),
327 }
328 }
329
330 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version, http_client_traces) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished {
357 result: Ok(result),
358 http_client_traces,
359 },
360 };
361
362 Some(Append {
363 created_at: result_obtained_at,
364 primary_event,
365 execution_id,
366 version: new_version,
367 child_finished,
368 })
369 }
370 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371 WorkerResult::Err(err) => {
372 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
374 WorkerError::TemporaryTimeout {
375 http_client_traces,
376 version,
377 } => {
378 if let Some(duration) = can_be_retried {
379 let backoff_expires_at = result_obtained_at + duration;
380 info!(
381 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
382 );
383 (
384 ExecutionEventInner::TemporarilyTimedOut {
385 backoff_expires_at,
386 http_client_traces,
387 },
388 None,
389 version,
390 )
391 } else {
392 info!("Permanent timeout");
393 let result = Err(FinishedExecutionError::PermanentTimeout);
394 let child_finished =
395 parent.map(|(parent_execution_id, parent_join_set)| {
396 ChildFinishedResponse {
397 parent_execution_id,
398 parent_join_set,
399 result: result.clone(),
400 }
401 });
402 (
403 ExecutionEventInner::Finished {
404 result,
405 http_client_traces,
406 },
407 child_finished,
408 version,
409 )
410 }
411 }
412 WorkerError::DbError(db_error) => {
413 return Err(db_error);
414 }
415 WorkerError::ActivityTrap {
416 reason: reason_inner,
417 trap_kind,
418 detail,
419 version,
420 http_client_traces,
421 } => retry_or_fail(
422 result_obtained_at,
423 parent,
424 can_be_retried,
425 reason_full,
426 reason_inner,
427 trap_kind, Some(detail),
429 version,
430 http_client_traces,
431 ),
432 WorkerError::ActivityPreopenedDirError {
433 reason_kind,
434 reason_inner,
435 version,
436 } => retry_or_fail(
437 result_obtained_at,
438 parent,
439 can_be_retried,
440 reason_full,
441 reason_inner,
442 reason_kind, None, version,
445 None, ),
447 WorkerError::ActivityReturnedError {
448 detail,
449 version,
450 http_client_traces,
451 } => {
452 let duration = can_be_retried.expect(
453 "ActivityReturnedError must not be returned when retries are exhausted",
454 );
455 let expires_at = result_obtained_at + duration;
456 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
457 (
458 ExecutionEventInner::TemporarilyFailed {
459 backoff_expires_at: expires_at,
460 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
462 detail, http_client_traces,
464 },
465 None,
466 version,
467 )
468 }
469 WorkerError::TemporaryWorkflowTrap {
470 reason: reason_inner,
471 kind,
472 detail,
473 version,
474 } => {
475 let duration = can_be_retried.expect("workflows are retried forever");
476 let expires_at = result_obtained_at + duration;
477 debug!(
478 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
479 );
480 (
481 ExecutionEventInner::TemporarilyFailed {
482 backoff_expires_at: expires_at,
483 reason_full: StrVariant::from(reason_full),
484 reason_inner: StrVariant::from(reason_inner),
485 detail,
486 http_client_traces: None,
487 },
488 None,
489 version,
490 )
491 }
492 WorkerError::LimitReached {
493 reason: inner_reason,
494 version: new_version,
495 } => {
496 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
497 warn!(
498 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
499 );
500 (
501 ExecutionEventInner::Unlocked {
502 backoff_expires_at: expires_at,
503 reason: StrVariant::from(reason_full),
504 },
505 None,
506 new_version,
507 )
508 }
509 WorkerError::FatalError(fatal_error, version) => {
510 info!("Fatal worker error - {fatal_error:?}");
511 let result = Err(FinishedExecutionError::from(fatal_error));
512 let child_finished =
513 parent.map(|(parent_execution_id, parent_join_set)| {
514 ChildFinishedResponse {
515 parent_execution_id,
516 parent_join_set,
517 result: result.clone(),
518 }
519 });
520 (
521 ExecutionEventInner::Finished {
522 result,
523 http_client_traces: None,
524 },
525 child_finished,
526 version,
527 )
528 }
529 };
530 Some(Append {
531 created_at: result_obtained_at,
532 primary_event: AppendRequest {
533 created_at: result_obtained_at,
534 event: primary_event,
535 },
536 execution_id,
537 version,
538 child_finished,
539 })
540 }
541 })
542 }
543}
544
545#[expect(clippy::too_many_arguments)]
546fn retry_or_fail(
547 result_obtained_at: DateTime<Utc>,
548 parent: Option<(ExecutionId, JoinSetId)>,
549 can_be_retried: Option<Duration>,
550 reason_full: String,
551 reason_inner: String,
552 err_kind_for_logs: impl Display,
553 detail: Option<String>,
554 version: Version,
555 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
556) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
557 if let Some(duration) = can_be_retried {
558 let expires_at = result_obtained_at + duration;
559 debug!(
560 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
561 );
562 (
563 ExecutionEventInner::TemporarilyFailed {
564 backoff_expires_at: expires_at,
565 reason_full: StrVariant::from(reason_full),
566 reason_inner: StrVariant::from(reason_inner),
567 detail,
568 http_client_traces,
569 },
570 None,
571 version,
572 )
573 } else {
574 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
575 let result = Err(FinishedExecutionError::PermanentFailure {
576 reason_inner,
577 reason_full,
578 kind: PermanentFailureKind::ActivityTrap,
579 detail,
580 });
581 let child_finished =
582 parent.map(
583 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
584 parent_execution_id,
585 parent_join_set,
586 result: result.clone(),
587 },
588 );
589 (
590 ExecutionEventInner::Finished {
591 result,
592 http_client_traces,
593 },
594 child_finished,
595 version,
596 )
597 }
598}
599
600#[derive(Debug, Clone)]
601pub(crate) struct ChildFinishedResponse {
602 pub(crate) parent_execution_id: ExecutionId,
603 pub(crate) parent_join_set: JoinSetId,
604 pub(crate) result: FinishedExecutionResult,
605}
606
607#[derive(Debug, Clone)]
608pub(crate) struct Append {
609 pub(crate) created_at: DateTime<Utc>,
610 pub(crate) primary_event: AppendRequest,
611 pub(crate) execution_id: ExecutionId,
612 pub(crate) version: Version,
613 pub(crate) child_finished: Option<ChildFinishedResponse>,
614}
615
616impl Append {
617 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
618 if let Some(child_finished) = self.child_finished {
619 assert_matches!(
620 &self.primary_event,
621 AppendRequest {
622 event: ExecutionEventInner::Finished { .. },
623 ..
624 }
625 );
626 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
627 db_connection
628 .append_batch_respond_to_parent(
629 derived.clone(),
630 self.created_at,
631 vec![self.primary_event],
632 self.version.clone(),
633 child_finished.parent_execution_id,
634 JoinSetResponseEventOuter {
635 created_at: self.created_at,
636 event: JoinSetResponseEvent {
637 join_set_id: child_finished.parent_join_set,
638 event: JoinSetResponse::ChildExecutionFinished {
639 child_execution_id: derived,
640 finished_version: self.version,
642 result: child_finished.result,
643 },
644 },
645 },
646 )
647 .await?;
648 } else {
649 db_connection
650 .append_batch(
651 self.created_at,
652 vec![self.primary_event],
653 self.execution_id,
654 self.version,
655 )
656 .await?;
657 }
658 Ok(())
659 }
660}
661
662#[cfg(any(test, feature = "test"))]
663pub mod simple_worker {
664 use crate::worker::{Worker, WorkerContext, WorkerResult};
665 use async_trait::async_trait;
666 use concepts::{
667 FunctionFqn, FunctionMetadata, ParameterTypes,
668 storage::{HistoryEvent, Version},
669 };
670 use indexmap::IndexMap;
671 use std::sync::Arc;
672 use tracing::trace;
673
674 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
675 pub type SimpleWorkerResultMap =
676 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
677
678 #[derive(Clone, Debug)]
679 pub struct SimpleWorker {
680 pub worker_results_rev: SimpleWorkerResultMap,
681 pub ffqn: FunctionFqn,
682 exported: [FunctionMetadata; 1],
683 }
684
685 impl SimpleWorker {
686 #[must_use]
687 pub fn with_single_result(res: WorkerResult) -> Self {
688 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
689 Version::new(2),
690 (vec![], res),
691 )]))))
692 }
693
694 #[must_use]
695 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
696 Self {
697 worker_results_rev: self.worker_results_rev,
698 exported: [FunctionMetadata {
699 ffqn: ffqn.clone(),
700 parameter_types: ParameterTypes::default(),
701 return_type: None,
702 extension: None,
703 submittable: true,
704 }],
705 ffqn,
706 }
707 }
708
709 #[must_use]
710 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
711 Self {
712 worker_results_rev,
713 ffqn: FFQN_SOME,
714 exported: [FunctionMetadata {
715 ffqn: FFQN_SOME,
716 parameter_types: ParameterTypes::default(),
717 return_type: None,
718 extension: None,
719 submittable: true,
720 }],
721 }
722 }
723 }
724
725 #[async_trait]
726 impl Worker for SimpleWorker {
727 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
728 let (expected_version, (expected_eh, worker_result)) =
729 self.worker_results_rev.lock().unwrap().pop().unwrap();
730 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
731 assert_eq!(expected_version, ctx.version);
732 assert_eq!(expected_eh, ctx.event_history);
733 worker_result
734 }
735
736 fn exported_functions(&self) -> &[FunctionMetadata] {
737 &self.exported
738 }
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 use self::simple_worker::SimpleWorker;
745 use super::*;
746 use crate::worker::FatalError;
747 use crate::{expired_timers_watcher, worker::WorkerResult};
748 use assert_matches::assert_matches;
749 use async_trait::async_trait;
750 use concepts::storage::{CreateRequest, JoinSetRequest};
751 use concepts::storage::{
752 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
753 };
754 use concepts::time::Now;
755 use concepts::{
756 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
757 SupportedFunctionReturnValue, TrapKind,
758 };
759 use db_tests::Database;
760 use indexmap::IndexMap;
761 use simple_worker::FFQN_SOME;
762 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
763 use test_utils::set_up;
764 use test_utils::sim_clock::{ConstClock, SimClock};
765
766 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
767
768 async fn tick_fn<
769 W: Worker + Debug,
770 C: ClockFn + 'static,
771 DB: DbConnection + 'static,
772 P: DbPool<DB> + 'static,
773 >(
774 config: ExecConfig,
775 clock_fn: C,
776 db_pool: P,
777 worker: Arc<W>,
778 executed_at: DateTime<Utc>,
779 ) -> ExecutionProgress {
780 trace!("Ticking with {worker:?}");
781 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
782 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
783 let mut execution_progress = executor.tick(executed_at).await.unwrap();
784 loop {
785 execution_progress
786 .executions
787 .retain(|(_, abort_handle)| !abort_handle.is_finished());
788 if execution_progress.executions.is_empty() {
789 return execution_progress;
790 }
791 tokio::time::sleep(Duration::from_millis(10)).await;
792 }
793 }
794
795 #[tokio::test]
796 async fn execute_simple_lifecycle_tick_based_mem() {
797 let created_at = Now.now();
798 let (_guard, db_pool) = Database::Memory.set_up().await;
799 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
800 db_pool.close().await.unwrap();
801 }
802
803 #[cfg(not(madsim))]
804 #[tokio::test]
805 async fn execute_simple_lifecycle_tick_based_sqlite() {
806 let created_at = Now.now();
807 let (_guard, db_pool) = Database::Sqlite.set_up().await;
808 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
809 db_pool.close().await.unwrap();
810 }
811
812 async fn execute_simple_lifecycle_tick_based<
813 DB: DbConnection + 'static,
814 P: DbPool<DB> + 'static,
815 C: ClockFn + 'static,
816 >(
817 pool: P,
818 clock_fn: C,
819 ) {
820 set_up();
821 let created_at = clock_fn.now();
822 let exec_config = ExecConfig {
823 batch_size: 1,
824 lock_expiry: Duration::from_secs(1),
825 tick_sleep: Duration::from_millis(100),
826 component_id: ComponentId::dummy_activity(),
827 task_limiter: None,
828 };
829
830 let execution_log = create_and_tick(
831 CreateAndTickConfig {
832 execution_id: ExecutionId::generate(),
833 created_at,
834 max_retries: 0,
835 executed_at: created_at,
836 retry_exp_backoff: Duration::ZERO,
837 },
838 clock_fn,
839 pool,
840 exec_config,
841 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
842 SupportedFunctionReturnValue::None,
843 Version::new(2),
844 None,
845 ))),
846 tick_fn,
847 )
848 .await;
849 assert_matches!(
850 execution_log.events.get(2).unwrap(),
851 ExecutionEvent {
852 event: ExecutionEventInner::Finished {
853 result: Ok(SupportedFunctionReturnValue::None),
854 http_client_traces: None
855 },
856 created_at: _,
857 backtrace_id: None,
858 }
859 );
860 }
861
862 #[tokio::test]
863 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
864 set_up();
865 let created_at = Now.now();
866 let clock_fn = ConstClock(created_at);
867 let (_guard, db_pool) = Database::Memory.set_up().await;
868 let exec_config = ExecConfig {
869 batch_size: 1,
870 lock_expiry: Duration::from_secs(1),
871 tick_sleep: Duration::ZERO,
872 component_id: ComponentId::dummy_activity(),
873 task_limiter: None,
874 };
875
876 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
877 SupportedFunctionReturnValue::None,
878 Version::new(2),
879 None,
880 )));
881 let exec_task = ExecTask::spawn_new(
882 worker.clone(),
883 exec_config.clone(),
884 clock_fn,
885 db_pool.clone(),
886 ExecutorId::generate(),
887 );
888
889 let execution_log = create_and_tick(
890 CreateAndTickConfig {
891 execution_id: ExecutionId::generate(),
892 created_at,
893 max_retries: 0,
894 executed_at: created_at,
895 retry_exp_backoff: Duration::ZERO,
896 },
897 clock_fn,
898 db_pool.clone(),
899 exec_config,
900 worker,
901 |_, _, _, _, _| async {
902 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
904 },
905 )
906 .await;
907 exec_task.close().await;
908 db_pool.close().await.unwrap();
909 assert_matches!(
910 execution_log.events.get(2).unwrap(),
911 ExecutionEvent {
912 event: ExecutionEventInner::Finished {
913 result: Ok(SupportedFunctionReturnValue::None),
914 http_client_traces: None
915 },
916 created_at: _,
917 backtrace_id: None,
918 }
919 );
920 }
921
922 struct CreateAndTickConfig {
923 execution_id: ExecutionId,
924 created_at: DateTime<Utc>,
925 max_retries: u32,
926 executed_at: DateTime<Utc>,
927 retry_exp_backoff: Duration,
928 }
929
930 async fn create_and_tick<
931 W: Worker,
932 C: ClockFn,
933 DB: DbConnection,
934 P: DbPool<DB>,
935 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
936 F: Future<Output = ExecutionProgress>,
937 >(
938 config: CreateAndTickConfig,
939 clock_fn: C,
940 db_pool: P,
941 exec_config: ExecConfig,
942 worker: Arc<W>,
943 mut tick: T,
944 ) -> ExecutionLog {
945 let db_connection = db_pool.connection();
947 db_connection
948 .create(CreateRequest {
949 created_at: config.created_at,
950 execution_id: config.execution_id.clone(),
951 ffqn: FFQN_SOME,
952 params: Params::empty(),
953 parent: None,
954 metadata: concepts::ExecutionMetadata::empty(),
955 scheduled_at: config.created_at,
956 retry_exp_backoff: config.retry_exp_backoff,
957 max_retries: config.max_retries,
958 component_id: ComponentId::dummy_activity(),
959 scheduled_by: None,
960 })
961 .await
962 .unwrap();
963 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
965 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
966 debug!("Execution history after tick: {execution_log:?}");
967 assert_matches!(
969 execution_log.events.first().unwrap(),
970 ExecutionEvent {
971 event: ExecutionEventInner::Created { .. },
972 created_at: actually_created_at,
973 backtrace_id: None,
974 }
975 if config.created_at == *actually_created_at
976 );
977 let locked_at = assert_matches!(
978 execution_log.events.get(1).unwrap(),
979 ExecutionEvent {
980 event: ExecutionEventInner::Locked { .. },
981 created_at: locked_at,
982 backtrace_id: None,
983 } if config.created_at <= *locked_at
984 => *locked_at
985 );
986 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
987 event: _,
988 created_at: executed_at,
989 backtrace_id: None,
990 } if *executed_at >= locked_at);
991 execution_log
992 }
993
994 #[tokio::test]
995 async fn activity_trap_should_trigger_an_execution_retry() {
996 set_up();
997 let sim_clock = SimClock::default();
998 let (_guard, db_pool) = Database::Memory.set_up().await;
999 let exec_config = ExecConfig {
1000 batch_size: 1,
1001 lock_expiry: Duration::from_secs(1),
1002 tick_sleep: Duration::ZERO,
1003 component_id: ComponentId::dummy_activity(),
1004 task_limiter: None,
1005 };
1006 let expected_reason = "error reason";
1007 let expected_detail = "error detail";
1008 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1009 WorkerError::ActivityTrap {
1010 reason: expected_reason.to_string(),
1011 trap_kind: concepts::TrapKind::Trap,
1012 detail: expected_detail.to_string(),
1013 version: Version::new(2),
1014 http_client_traces: None,
1015 },
1016 )));
1017 let retry_exp_backoff = Duration::from_millis(100);
1018 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1019 let execution_log = create_and_tick(
1020 CreateAndTickConfig {
1021 execution_id: ExecutionId::generate(),
1022 created_at: sim_clock.now(),
1023 max_retries: 1,
1024 executed_at: sim_clock.now(),
1025 retry_exp_backoff,
1026 },
1027 sim_clock.clone(),
1028 db_pool.clone(),
1029 exec_config.clone(),
1030 worker,
1031 tick_fn,
1032 )
1033 .await;
1034 assert_eq!(3, execution_log.events.len());
1035 {
1036 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1037 &execution_log.events.get(2).unwrap(),
1038 ExecutionEvent {
1039 event: ExecutionEventInner::TemporarilyFailed {
1040 reason_inner,
1041 reason_full,
1042 detail,
1043 backoff_expires_at,
1044 http_client_traces: None,
1045 },
1046 created_at: at,
1047 backtrace_id: None,
1048 }
1049 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1050 );
1051 assert_eq!(expected_reason, reason_inner.deref());
1052 assert_eq!(
1053 format!("activity trap: {expected_reason}"),
1054 reason_full.deref()
1055 );
1056 assert_eq!(Some(expected_detail), detail.as_deref());
1057 assert_eq!(at, sim_clock.now());
1058 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1059 }
1060 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1061 std::sync::Mutex::new(IndexMap::from([(
1062 Version::new(4),
1063 (
1064 vec![],
1065 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1066 ),
1067 )])),
1068 )));
1069 assert!(
1071 tick_fn(
1072 exec_config.clone(),
1073 sim_clock.clone(),
1074 db_pool.clone(),
1075 worker.clone(),
1076 sim_clock.now(),
1077 )
1078 .await
1079 .executions
1080 .is_empty()
1081 );
1082 sim_clock.move_time_forward(retry_exp_backoff).await;
1084 tick_fn(
1085 exec_config,
1086 sim_clock.clone(),
1087 db_pool.clone(),
1088 worker,
1089 sim_clock.now(),
1090 )
1091 .await;
1092 let execution_log = {
1093 let db_connection = db_pool.connection();
1094 db_connection
1095 .get(&execution_log.execution_id)
1096 .await
1097 .unwrap()
1098 };
1099 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1100 assert_matches!(
1101 execution_log.events.get(3).unwrap(),
1102 ExecutionEvent {
1103 event: ExecutionEventInner::Locked { .. },
1104 created_at: at,
1105 backtrace_id: None,
1106 } if *at == sim_clock.now()
1107 );
1108 assert_matches!(
1109 execution_log.events.get(4).unwrap(),
1110 ExecutionEvent {
1111 event: ExecutionEventInner::Finished {
1112 result: Ok(SupportedFunctionReturnValue::None),
1113 http_client_traces: None
1114 },
1115 created_at: finished_at,
1116 backtrace_id: None,
1117 } if *finished_at == sim_clock.now()
1118 );
1119 db_pool.close().await.unwrap();
1120 }
1121
1122 #[tokio::test]
1123 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1124 set_up();
1125 let created_at = Now.now();
1126 let clock_fn = ConstClock(created_at);
1127 let (_guard, db_pool) = Database::Memory.set_up().await;
1128 let exec_config = ExecConfig {
1129 batch_size: 1,
1130 lock_expiry: Duration::from_secs(1),
1131 tick_sleep: Duration::ZERO,
1132 component_id: ComponentId::dummy_activity(),
1133 task_limiter: None,
1134 };
1135
1136 let expected_reason = "error reason";
1137 let expected_detail = "error detail";
1138 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1139 WorkerError::ActivityTrap {
1140 reason: expected_reason.to_string(),
1141 trap_kind: concepts::TrapKind::Trap,
1142 detail: expected_detail.to_string(),
1143 version: Version::new(2),
1144 http_client_traces: None,
1145 },
1146 )));
1147 let execution_log = create_and_tick(
1148 CreateAndTickConfig {
1149 execution_id: ExecutionId::generate(),
1150 created_at,
1151 max_retries: 0,
1152 executed_at: created_at,
1153 retry_exp_backoff: Duration::ZERO,
1154 },
1155 clock_fn,
1156 db_pool.clone(),
1157 exec_config.clone(),
1158 worker,
1159 tick_fn,
1160 )
1161 .await;
1162 assert_eq!(3, execution_log.events.len());
1163 let (reason, kind, detail) = assert_matches!(
1164 &execution_log.events.get(2).unwrap(),
1165 ExecutionEvent {
1166 event: ExecutionEventInner::Finished{
1167 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1168 http_client_traces: None
1169 },
1170 created_at: at,
1171 backtrace_id: None,
1172 } if *at == created_at
1173 => (reason_inner, kind, detail)
1174 );
1175 assert_eq!(expected_reason, *reason);
1176 assert_eq!(Some(expected_detail), detail.as_deref());
1177 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1178
1179 db_pool.close().await.unwrap();
1180 }
1181
1182 #[tokio::test]
1183 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1184 let worker_error = WorkerError::ActivityTrap {
1185 reason: "error reason".to_string(),
1186 trap_kind: TrapKind::Trap,
1187 detail: "detail".to_string(),
1188 version: Version::new(2),
1189 http_client_traces: None,
1190 };
1191 let expected_child_err = FinishedExecutionError::PermanentFailure {
1192 reason_full: "activity trap: error reason".to_string(),
1193 reason_inner: "error reason".to_string(),
1194 kind: PermanentFailureKind::ActivityTrap,
1195 detail: Some("detail".to_string()),
1196 };
1197 child_execution_permanently_failed_should_notify_parent(
1198 WorkerResult::Err(worker_error),
1199 expected_child_err,
1200 )
1201 .await;
1202 }
1203
1204 #[tokio::test]
1207 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1208 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1209 child_execution_permanently_failed_should_notify_parent(
1210 WorkerResult::DbUpdatedByWorkerOrWatcher,
1211 expected_child_err,
1212 )
1213 .await;
1214 }
1215
1216 #[tokio::test]
1217 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1218 let parent_id = ExecutionId::from_parts(1, 1);
1219 let join_set_id_outer =
1220 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1221 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1222 let join_set_id_inner =
1223 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1224 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1225 let worker_error = WorkerError::FatalError(
1226 FatalError::UnhandledChildExecutionError {
1227 child_execution_id: child_execution_id.clone(),
1228 root_cause_id: root_cause_id.clone(),
1229 },
1230 Version::new(2),
1231 );
1232 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1233 child_execution_id,
1234 root_cause_id,
1235 };
1236 child_execution_permanently_failed_should_notify_parent(
1237 WorkerResult::Err(worker_error),
1238 expected_child_err,
1239 )
1240 .await;
1241 }
1242
1243 async fn child_execution_permanently_failed_should_notify_parent(
1244 worker_result: WorkerResult,
1245 expected_child_err: FinishedExecutionError,
1246 ) {
1247 use concepts::storage::JoinSetResponseEventOuter;
1248 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1249
1250 set_up();
1251 let sim_clock = SimClock::default();
1252 let (_guard, db_pool) = Database::Memory.set_up().await;
1253
1254 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1255 WorkerResult::DbUpdatedByWorkerOrWatcher,
1256 ));
1257 let parent_execution_id = ExecutionId::generate();
1258 db_pool
1259 .connection()
1260 .create(CreateRequest {
1261 created_at: sim_clock.now(),
1262 execution_id: parent_execution_id.clone(),
1263 ffqn: FFQN_SOME,
1264 params: Params::empty(),
1265 parent: None,
1266 metadata: concepts::ExecutionMetadata::empty(),
1267 scheduled_at: sim_clock.now(),
1268 retry_exp_backoff: Duration::ZERO,
1269 max_retries: 0,
1270 component_id: ComponentId::dummy_activity(),
1271 scheduled_by: None,
1272 })
1273 .await
1274 .unwrap();
1275 tick_fn(
1276 ExecConfig {
1277 batch_size: 1,
1278 lock_expiry: LOCK_EXPIRY,
1279 tick_sleep: Duration::ZERO,
1280 component_id: ComponentId::dummy_activity(),
1281 task_limiter: None,
1282 },
1283 sim_clock.clone(),
1284 db_pool.clone(),
1285 parent_worker,
1286 sim_clock.now(),
1287 )
1288 .await;
1289
1290 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1291 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1292 {
1294 let child = CreateRequest {
1295 created_at: sim_clock.now(),
1296 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1297 ffqn: FFQN_CHILD,
1298 params: Params::empty(),
1299 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1300 metadata: concepts::ExecutionMetadata::empty(),
1301 scheduled_at: sim_clock.now(),
1302 retry_exp_backoff: Duration::ZERO,
1303 max_retries: 0,
1304 component_id: ComponentId::dummy_activity(),
1305 scheduled_by: None,
1306 };
1307 let current_time = sim_clock.now();
1308 let join_set = AppendRequest {
1309 created_at: current_time,
1310 event: ExecutionEventInner::HistoryEvent {
1311 event: HistoryEvent::JoinSetCreate {
1312 join_set_id: join_set_id.clone(),
1313 closing_strategy: ClosingStrategy::Complete,
1314 },
1315 },
1316 };
1317 let child_exec_req = AppendRequest {
1318 created_at: current_time,
1319 event: ExecutionEventInner::HistoryEvent {
1320 event: HistoryEvent::JoinSetRequest {
1321 join_set_id: join_set_id.clone(),
1322 request: JoinSetRequest::ChildExecutionRequest {
1323 child_execution_id: child_execution_id.clone(),
1324 },
1325 },
1326 },
1327 };
1328 let join_next = AppendRequest {
1329 created_at: current_time,
1330 event: ExecutionEventInner::HistoryEvent {
1331 event: HistoryEvent::JoinNext {
1332 join_set_id: join_set_id.clone(),
1333 run_expires_at: sim_clock.now(),
1334 closing: false,
1335 },
1336 },
1337 };
1338 db_pool
1339 .connection()
1340 .append_batch_create_new_execution(
1341 current_time,
1342 vec![join_set, child_exec_req, join_next],
1343 parent_execution_id.clone(),
1344 Version::new(2),
1345 vec![child],
1346 )
1347 .await
1348 .unwrap();
1349 }
1350
1351 let child_worker =
1352 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1353
1354 tick_fn(
1356 ExecConfig {
1357 batch_size: 1,
1358 lock_expiry: LOCK_EXPIRY,
1359 tick_sleep: Duration::ZERO,
1360 component_id: ComponentId::dummy_activity(),
1361 task_limiter: None,
1362 },
1363 sim_clock.clone(),
1364 db_pool.clone(),
1365 child_worker,
1366 sim_clock.now(),
1367 )
1368 .await;
1369 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1370 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1372 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1373 .await
1374 .unwrap();
1375 }
1376 let child_log = db_pool
1377 .connection()
1378 .get(&ExecutionId::Derived(child_execution_id.clone()))
1379 .await
1380 .unwrap();
1381 assert!(child_log.pending_state.is_finished());
1382 assert_eq!(
1383 Version(2),
1384 child_log.next_version,
1385 "created = 0, locked = 1, with_single_result = 2"
1386 );
1387 assert_eq!(
1388 ExecutionEventInner::Finished {
1389 result: Err(expected_child_err),
1390 http_client_traces: None
1391 },
1392 child_log.last_event().event
1393 );
1394 let parent_log = db_pool
1395 .connection()
1396 .get(&parent_execution_id)
1397 .await
1398 .unwrap();
1399 assert_matches!(
1400 parent_log.pending_state,
1401 PendingState::PendingAt {
1402 scheduled_at
1403 } if scheduled_at == sim_clock.now(),
1404 "parent should be back to pending"
1405 );
1406 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1407 parent_log.responses.last(),
1408 Some(JoinSetResponseEventOuter{
1409 created_at: at,
1410 event: JoinSetResponseEvent{
1411 join_set_id: found_join_set_id,
1412 event: JoinSetResponse::ChildExecutionFinished {
1413 child_execution_id: found_child_execution_id,
1414 finished_version,
1415 result: found_result,
1416 }
1417 }
1418 })
1419 if *at == sim_clock.now()
1420 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1421 );
1422 assert_eq!(join_set_id, *found_join_set_id);
1423 assert_eq!(child_execution_id, *found_child_execution_id);
1424 assert_eq!(child_log.next_version, *child_finished_version);
1425 assert!(found_result.is_err());
1426
1427 db_pool.close().await.unwrap();
1428 }
1429
1430 #[derive(Clone, Debug)]
1431 struct SleepyWorker {
1432 duration: Duration,
1433 result: SupportedFunctionReturnValue,
1434 exported: [FunctionMetadata; 1],
1435 }
1436
1437 #[async_trait]
1438 impl Worker for SleepyWorker {
1439 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1440 tokio::time::sleep(self.duration).await;
1441 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1442 }
1443
1444 fn exported_functions(&self) -> &[FunctionMetadata] {
1445 &self.exported
1446 }
1447 }
1448
1449 #[tokio::test]
1450 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1451 set_up();
1452 let sim_clock = SimClock::default();
1453 let (_guard, db_pool) = Database::Memory.set_up().await;
1454 let lock_expiry = Duration::from_millis(100);
1455 let exec_config = ExecConfig {
1456 batch_size: 1,
1457 lock_expiry,
1458 tick_sleep: Duration::ZERO,
1459 component_id: ComponentId::dummy_activity(),
1460 task_limiter: None,
1461 };
1462
1463 let worker = Arc::new(SleepyWorker {
1464 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1466 exported: [FunctionMetadata {
1467 ffqn: FFQN_SOME,
1468 parameter_types: ParameterTypes::default(),
1469 return_type: None,
1470 extension: None,
1471 submittable: true,
1472 }],
1473 });
1474 let execution_id = ExecutionId::generate();
1476 let timeout_duration = Duration::from_millis(300);
1477 let db_connection = db_pool.connection();
1478 db_connection
1479 .create(CreateRequest {
1480 created_at: sim_clock.now(),
1481 execution_id: execution_id.clone(),
1482 ffqn: FFQN_SOME,
1483 params: Params::empty(),
1484 parent: None,
1485 metadata: concepts::ExecutionMetadata::empty(),
1486 scheduled_at: sim_clock.now(),
1487 retry_exp_backoff: timeout_duration,
1488 max_retries: 1,
1489 component_id: ComponentId::dummy_activity(),
1490 scheduled_by: None,
1491 })
1492 .await
1493 .unwrap();
1494
1495 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1496 let executor = ExecTask::new(
1497 worker,
1498 exec_config,
1499 sim_clock.clone(),
1500 db_pool.clone(),
1501 ffqns,
1502 );
1503 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1504 assert_eq!(1, first_execution_progress.executions.len());
1505 sim_clock.move_time_forward(lock_expiry).await;
1507 let now_after_first_lock_expiry = sim_clock.now();
1509 {
1510 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1511 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1512 assert!(cleanup_progress.executions.is_empty());
1513 }
1514 {
1515 let expired_locks =
1516 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1517 .await
1518 .unwrap()
1519 .expired_locks;
1520 assert_eq!(1, expired_locks);
1521 }
1522 assert!(
1523 !first_execution_progress
1524 .executions
1525 .pop()
1526 .unwrap()
1527 .1
1528 .is_finished()
1529 );
1530
1531 let execution_log = db_connection.get(&execution_id).await.unwrap();
1532 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1533 assert_matches!(
1534 &execution_log.events.get(2).unwrap(),
1535 ExecutionEvent {
1536 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1537 created_at: at,
1538 backtrace_id: None,
1539 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1540 );
1541 assert_eq!(
1542 PendingState::PendingAt {
1543 scheduled_at: expected_first_timeout_expiry
1544 },
1545 execution_log.pending_state
1546 );
1547 sim_clock.move_time_forward(timeout_duration).await;
1548 let now_after_first_timeout = sim_clock.now();
1549 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1550
1551 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1552 assert_eq!(1, second_execution_progress.executions.len());
1553
1554 sim_clock.move_time_forward(lock_expiry).await;
1556 let now_after_second_lock_expiry = sim_clock.now();
1558 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1559 {
1560 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1561 assert!(cleanup_progress.executions.is_empty());
1562 }
1563 {
1564 let expired_locks =
1565 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1566 .await
1567 .unwrap()
1568 .expired_locks;
1569 assert_eq!(1, expired_locks);
1570 }
1571 assert!(
1572 !second_execution_progress
1573 .executions
1574 .pop()
1575 .unwrap()
1576 .1
1577 .is_finished()
1578 );
1579
1580 drop(db_connection);
1581 drop(executor);
1582 db_pool.close().await.unwrap();
1583 }
1584}