1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12 FinishedExecutionError,
13 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_ffqns(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 "otel.name" = format!("worker {}", locked_execution.ffqn),
240 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241 locked_execution.metadata.enrich(&worker_span);
242 tokio::spawn({
243 let worker_span2 = worker_span.clone();
244 async move {
245 let res = Self::run_worker(
246 worker,
247 &db_pool,
248 execution_deadline,
249 clock_fn,
250 locked_execution,
251 worker_span2,
252 )
253 .await;
254 if let Err(db_error) = res {
255 error!("Execution will be timed out not writing `{db_error:?}`");
256 }
257 drop(permit);
258 }
259 .instrument(worker_span)
260 })
261 };
262 executions.push((execution_id, join_handle));
263 }
264 Ok(ExecutionProgress { executions })
265 }
266
267 async fn run_worker(
268 worker: Arc<dyn Worker>,
269 db_pool: &P,
270 execution_deadline: DateTime<Utc>,
271 clock_fn: C,
272 locked_execution: LockedExecution,
273 worker_span: Span,
274 ) -> Result<(), DbError> {
275 debug!("Worker::run starting");
276 trace!(
277 version = %locked_execution.version,
278 params = ?locked_execution.params,
279 event_history = ?locked_execution.event_history,
280 "Worker::run starting"
281 );
282 let can_be_retried = ExecutionLog::can_be_retried_after(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.max_retries,
285 locked_execution.retry_exp_backoff,
286 );
287 let unlock_expiry_on_limit_reached =
288 ExecutionLog::compute_retry_duration_when_retrying_forever(
289 locked_execution.temporary_event_count + 1,
290 locked_execution.retry_exp_backoff,
291 );
292 let ctx = WorkerContext {
293 execution_id: locked_execution.execution_id.clone(),
294 metadata: locked_execution.metadata,
295 ffqn: locked_execution.ffqn,
296 params: locked_execution.params,
297 event_history: locked_execution.event_history,
298 responses: locked_execution
299 .responses
300 .into_iter()
301 .map(|outer| outer.event)
302 .collect(),
303 version: locked_execution.version,
304 execution_deadline,
305 can_be_retried: can_be_retried.is_some(),
306 run_id: locked_execution.run_id,
307 worker_span,
308 };
309 let worker_result = worker.run(ctx).await;
310 trace!(?worker_result, "Worker::run finished");
311 let result_obtained_at = clock_fn.now();
312 match Self::worker_result_to_execution_event(
313 locked_execution.execution_id,
314 worker_result,
315 result_obtained_at,
316 locked_execution.parent,
317 can_be_retried,
318 unlock_expiry_on_limit_reached,
319 )? {
320 Some(append) => {
321 let db_connection = db_pool.connection();
322 trace!("Appending {append:?}");
323 append.clone().append(&db_connection).await
324 }
325 None => Ok(()),
326 }
327 }
328
329 #[expect(clippy::too_many_lines)]
332 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version, http_client_traces) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished {
357 result: Ok(result),
358 http_client_traces,
359 },
360 };
361
362 Some(Append {
363 created_at: result_obtained_at,
364 primary_event,
365 execution_id,
366 version: new_version,
367 child_finished,
368 })
369 }
370 WorkerResult::DbUpdatedByWorker => None,
371 WorkerResult::Err(err) => {
372 let reason = err.to_string();
373 let (primary_event, child_finished, version) = match err {
374 WorkerError::TemporaryTimeoutHandledByWatcher => {
375 info!("Temporary timeout handled by watcher");
376 return Ok(None);
377 }
378 WorkerError::TemporaryTimeout {
379 http_client_traces,
380 version,
381 } => {
382 if let Some(duration) = can_be_retried {
383 let backoff_expires_at = result_obtained_at + duration;
384 info!("Temporary timeout after {duration:?} at {backoff_expires_at}");
385 (
386 ExecutionEventInner::TemporarilyTimedOut {
387 backoff_expires_at,
388 http_client_traces,
389 },
390 None,
391 version,
392 )
393 } else {
394 info!("Permanent timeout");
395 let result = Err(FinishedExecutionError::PermanentTimeout);
396 let child_finished =
397 parent.map(|(parent_execution_id, parent_join_set)| {
398 ChildFinishedResponse {
399 parent_execution_id,
400 parent_join_set,
401 result: result.clone(),
402 }
403 });
404 (
405 ExecutionEventInner::Finished {
406 result,
407 http_client_traces,
408 },
409 child_finished,
410 version,
411 )
412 }
413 }
414 WorkerError::DbError(db_error) => {
415 return Err(db_error);
416 }
417 WorkerError::ActivityTrap {
418 reason: reason_inner,
419 trap_kind,
420 detail,
421 version,
422 http_client_traces,
423 } => {
424 if let Some(duration) = can_be_retried {
425 let expires_at = result_obtained_at + duration;
426 debug!(
427 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
428 );
429 (
430 ExecutionEventInner::TemporarilyFailed {
431 backoff_expires_at: expires_at,
432 reason_full: StrVariant::from(reason),
433 reason_inner: StrVariant::from(reason_inner),
434 detail: Some(detail),
435 http_client_traces,
436 },
437 None,
438 version,
439 )
440 } else {
441 info!(
442 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
443 );
444 let result = Err(FinishedExecutionError::PermanentFailure {
445 reason_inner,
446 reason_full: reason,
447 kind: PermanentFailureKind::ActivityTrap,
448 detail: Some(detail),
449 });
450 let child_finished =
451 parent.map(|(parent_execution_id, parent_join_set)| {
452 ChildFinishedResponse {
453 parent_execution_id,
454 parent_join_set,
455 result: result.clone(),
456 }
457 });
458 (
459 ExecutionEventInner::Finished {
460 result,
461 http_client_traces,
462 },
463 child_finished,
464 version,
465 )
466 }
467 }
468 WorkerError::ActivityReturnedError {
469 detail,
470 version,
471 http_client_traces,
472 } => {
473 let duration = can_be_retried.expect(
474 "ActivityReturnedError must not be returned when retries are exhausted",
475 );
476 let expires_at = result_obtained_at + duration;
477 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
478 (
479 ExecutionEventInner::TemporarilyFailed {
480 backoff_expires_at: expires_at,
481 reason_full: StrVariant::Static("activity returned error"),
482 reason_inner: StrVariant::Static("activity returned error"),
483 detail,
484 http_client_traces,
485 },
486 None,
487 version,
488 )
489 }
490 WorkerError::TemporaryWorkflowTrap {
491 reason: reason_inner,
492 kind,
493 detail,
494 version,
495 } => {
496 let duration = can_be_retried.expect("workflows are retried forever");
497 let expires_at = result_obtained_at + duration;
498 debug!(
499 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
500 );
501 (
502 ExecutionEventInner::TemporarilyFailed {
503 backoff_expires_at: expires_at,
504 reason_full: StrVariant::from(reason),
505 reason_inner: StrVariant::from(reason_inner),
506 detail,
507 http_client_traces: None,
508 },
509 None,
510 version,
511 )
512 }
513 WorkerError::LimitReached {
514 reason: inner_reason,
515 version: new_version,
516 } => {
517 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
518 warn!(
519 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
520 );
521 (
522 ExecutionEventInner::Unlocked {
523 backoff_expires_at: expires_at,
524 reason: StrVariant::from(reason),
525 },
526 None,
527 new_version,
528 )
529 }
530 WorkerError::FatalError(fatal_error, version) => {
531 info!("Fatal worker error - {fatal_error:?}");
532 let result = Err(FinishedExecutionError::from(fatal_error));
533 let child_finished =
534 parent.map(|(parent_execution_id, parent_join_set)| {
535 ChildFinishedResponse {
536 parent_execution_id,
537 parent_join_set,
538 result: result.clone(),
539 }
540 });
541 (
542 ExecutionEventInner::Finished {
543 result,
544 http_client_traces: None,
545 },
546 child_finished,
547 version,
548 )
549 }
550 };
551 Some(Append {
552 created_at: result_obtained_at,
553 primary_event: AppendRequest {
554 created_at: result_obtained_at,
555 event: primary_event,
556 },
557 execution_id,
558 version,
559 child_finished,
560 })
561 }
562 })
563 }
564}
565
566#[derive(Debug, Clone)]
567pub(crate) struct ChildFinishedResponse {
568 pub(crate) parent_execution_id: ExecutionId,
569 pub(crate) parent_join_set: JoinSetId,
570 pub(crate) result: FinishedExecutionResult,
571}
572
573#[derive(Debug, Clone)]
574pub(crate) struct Append {
575 pub(crate) created_at: DateTime<Utc>,
576 pub(crate) primary_event: AppendRequest,
577 pub(crate) execution_id: ExecutionId,
578 pub(crate) version: Version,
579 pub(crate) child_finished: Option<ChildFinishedResponse>,
580}
581
582impl Append {
583 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
584 if let Some(child_finished) = self.child_finished {
585 assert_matches!(
586 &self.primary_event,
587 AppendRequest {
588 event: ExecutionEventInner::Finished { .. },
589 ..
590 }
591 );
592 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
593 db_connection
594 .append_batch_respond_to_parent(
595 derived.clone(),
596 self.created_at,
597 vec![self.primary_event],
598 self.version.clone(),
599 child_finished.parent_execution_id,
600 JoinSetResponseEventOuter {
601 created_at: self.created_at,
602 event: JoinSetResponseEvent {
603 join_set_id: child_finished.parent_join_set,
604 event: JoinSetResponse::ChildExecutionFinished {
605 child_execution_id: derived,
606 finished_version: self.version,
608 result: child_finished.result,
609 },
610 },
611 },
612 )
613 .await?;
614 } else {
615 db_connection
616 .append_batch(
617 self.created_at,
618 vec![self.primary_event],
619 self.execution_id,
620 self.version,
621 )
622 .await?;
623 }
624 Ok(())
625 }
626}
627
628#[cfg(any(test, feature = "test"))]
629pub mod simple_worker {
630 use crate::worker::{Worker, WorkerContext, WorkerResult};
631 use async_trait::async_trait;
632 use concepts::{
633 FunctionFqn, FunctionMetadata, ParameterTypes,
634 storage::{HistoryEvent, Version},
635 };
636 use indexmap::IndexMap;
637 use std::sync::Arc;
638 use tracing::trace;
639
640 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
641 pub type SimpleWorkerResultMap =
642 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
643
644 #[derive(Clone, Debug)]
645 pub struct SimpleWorker {
646 pub worker_results_rev: SimpleWorkerResultMap,
647 pub ffqn: FunctionFqn,
648 exported: [FunctionMetadata; 1],
649 }
650
651 impl SimpleWorker {
652 #[must_use]
653 pub fn with_single_result(res: WorkerResult) -> Self {
654 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
655 Version::new(2),
656 (vec![], res),
657 )]))))
658 }
659
660 #[must_use]
661 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
662 Self {
663 worker_results_rev: self.worker_results_rev,
664 exported: [FunctionMetadata {
665 ffqn: ffqn.clone(),
666 parameter_types: ParameterTypes::default(),
667 return_type: None,
668 extension: None,
669 submittable: true,
670 }],
671 ffqn,
672 }
673 }
674
675 #[must_use]
676 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
677 Self {
678 worker_results_rev,
679 ffqn: FFQN_SOME,
680 exported: [FunctionMetadata {
681 ffqn: FFQN_SOME,
682 parameter_types: ParameterTypes::default(),
683 return_type: None,
684 extension: None,
685 submittable: true,
686 }],
687 }
688 }
689 }
690
691 #[async_trait]
692 impl Worker for SimpleWorker {
693 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
694 let (expected_version, (expected_eh, worker_result)) =
695 self.worker_results_rev.lock().unwrap().pop().unwrap();
696 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
697 assert_eq!(expected_version, ctx.version);
698 assert_eq!(expected_eh, ctx.event_history);
699 worker_result
700 }
701
702 fn exported_functions(&self) -> &[FunctionMetadata] {
703 &self.exported
704 }
705
706 fn imported_functions(&self) -> &[FunctionMetadata] {
707 &[]
708 }
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use self::simple_worker::SimpleWorker;
715 use super::*;
716 use crate::worker::FatalError;
717 use crate::{expired_timers_watcher, worker::WorkerResult};
718 use assert_matches::assert_matches;
719 use async_trait::async_trait;
720 use concepts::storage::{CreateRequest, JoinSetRequest};
721 use concepts::storage::{
722 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
723 };
724 use concepts::time::Now;
725 use concepts::{
726 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
727 SupportedFunctionReturnValue, TrapKind,
728 };
729 use db_tests::Database;
730 use indexmap::IndexMap;
731 use simple_worker::FFQN_SOME;
732 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
733 use test_utils::set_up;
734 use test_utils::sim_clock::{ConstClock, SimClock};
735
736 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
737
738 async fn tick_fn<
739 W: Worker + Debug,
740 C: ClockFn + 'static,
741 DB: DbConnection + 'static,
742 P: DbPool<DB> + 'static,
743 >(
744 config: ExecConfig,
745 clock_fn: C,
746 db_pool: P,
747 worker: Arc<W>,
748 executed_at: DateTime<Utc>,
749 ) -> ExecutionProgress {
750 trace!("Ticking with {worker:?}");
751 let ffqns = super::extract_ffqns(worker.as_ref());
752 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
753 let mut execution_progress = executor.tick(executed_at).await.unwrap();
754 loop {
755 execution_progress
756 .executions
757 .retain(|(_, abort_handle)| !abort_handle.is_finished());
758 if execution_progress.executions.is_empty() {
759 return execution_progress;
760 }
761 tokio::time::sleep(Duration::from_millis(10)).await;
762 }
763 }
764
765 #[tokio::test]
766 async fn execute_simple_lifecycle_tick_based_mem() {
767 let created_at = Now.now();
768 let (_guard, db_pool) = Database::Memory.set_up().await;
769 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
770 db_pool.close().await.unwrap();
771 }
772
773 #[cfg(not(madsim))]
774 #[tokio::test]
775 async fn execute_simple_lifecycle_tick_based_sqlite() {
776 let created_at = Now.now();
777 let (_guard, db_pool) = Database::Sqlite.set_up().await;
778 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
779 db_pool.close().await.unwrap();
780 }
781
782 async fn execute_simple_lifecycle_tick_based<
783 DB: DbConnection + 'static,
784 P: DbPool<DB> + 'static,
785 C: ClockFn + 'static,
786 >(
787 pool: P,
788 clock_fn: C,
789 ) {
790 set_up();
791 let created_at = clock_fn.now();
792 let exec_config = ExecConfig {
793 batch_size: 1,
794 lock_expiry: Duration::from_secs(1),
795 tick_sleep: Duration::from_millis(100),
796 component_id: ComponentId::dummy_activity(),
797 task_limiter: None,
798 };
799
800 let execution_log = create_and_tick(
801 CreateAndTickConfig {
802 execution_id: ExecutionId::generate(),
803 created_at,
804 max_retries: 0,
805 executed_at: created_at,
806 retry_exp_backoff: Duration::ZERO,
807 },
808 clock_fn,
809 pool,
810 exec_config,
811 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
812 SupportedFunctionReturnValue::None,
813 Version::new(2),
814 None,
815 ))),
816 tick_fn,
817 )
818 .await;
819 assert_matches!(
820 execution_log.events.get(2).unwrap(),
821 ExecutionEvent {
822 event: ExecutionEventInner::Finished {
823 result: Ok(SupportedFunctionReturnValue::None),
824 http_client_traces: None
825 },
826 created_at: _,
827 backtrace_id: None,
828 }
829 );
830 }
831
832 #[tokio::test]
833 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
834 set_up();
835 let created_at = Now.now();
836 let clock_fn = ConstClock(created_at);
837 let (_guard, db_pool) = Database::Memory.set_up().await;
838 let exec_config = ExecConfig {
839 batch_size: 1,
840 lock_expiry: Duration::from_secs(1),
841 tick_sleep: Duration::ZERO,
842 component_id: ComponentId::dummy_activity(),
843 task_limiter: None,
844 };
845
846 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
847 SupportedFunctionReturnValue::None,
848 Version::new(2),
849 None,
850 )));
851 let exec_task = ExecTask::spawn_new(
852 worker.clone(),
853 exec_config.clone(),
854 clock_fn,
855 db_pool.clone(),
856 ExecutorId::generate(),
857 );
858
859 let execution_log = create_and_tick(
860 CreateAndTickConfig {
861 execution_id: ExecutionId::generate(),
862 created_at,
863 max_retries: 0,
864 executed_at: created_at,
865 retry_exp_backoff: Duration::ZERO,
866 },
867 clock_fn,
868 db_pool.clone(),
869 exec_config,
870 worker,
871 |_, _, _, _, _| async {
872 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
874 },
875 )
876 .await;
877 exec_task.close().await;
878 db_pool.close().await.unwrap();
879 assert_matches!(
880 execution_log.events.get(2).unwrap(),
881 ExecutionEvent {
882 event: ExecutionEventInner::Finished {
883 result: Ok(SupportedFunctionReturnValue::None),
884 http_client_traces: None
885 },
886 created_at: _,
887 backtrace_id: None,
888 }
889 );
890 }
891
892 struct CreateAndTickConfig {
893 execution_id: ExecutionId,
894 created_at: DateTime<Utc>,
895 max_retries: u32,
896 executed_at: DateTime<Utc>,
897 retry_exp_backoff: Duration,
898 }
899
900 async fn create_and_tick<
901 W: Worker,
902 C: ClockFn,
903 DB: DbConnection,
904 P: DbPool<DB>,
905 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
906 F: Future<Output = ExecutionProgress>,
907 >(
908 config: CreateAndTickConfig,
909 clock_fn: C,
910 db_pool: P,
911 exec_config: ExecConfig,
912 worker: Arc<W>,
913 mut tick: T,
914 ) -> ExecutionLog {
915 let db_connection = db_pool.connection();
917 db_connection
918 .create(CreateRequest {
919 created_at: config.created_at,
920 execution_id: config.execution_id.clone(),
921 ffqn: FFQN_SOME,
922 params: Params::empty(),
923 parent: None,
924 metadata: concepts::ExecutionMetadata::empty(),
925 scheduled_at: config.created_at,
926 retry_exp_backoff: config.retry_exp_backoff,
927 max_retries: config.max_retries,
928 component_id: ComponentId::dummy_activity(),
929 scheduled_by: None,
930 })
931 .await
932 .unwrap();
933 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
935 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
936 debug!("Execution history after tick: {execution_log:?}");
937 assert_matches!(
939 execution_log.events.first().unwrap(),
940 ExecutionEvent {
941 event: ExecutionEventInner::Created { .. },
942 created_at: actually_created_at,
943 backtrace_id: None,
944 }
945 if config.created_at == *actually_created_at
946 );
947 let locked_at = assert_matches!(
948 execution_log.events.get(1).unwrap(),
949 ExecutionEvent {
950 event: ExecutionEventInner::Locked { .. },
951 created_at: locked_at,
952 backtrace_id: None,
953 } if config.created_at <= *locked_at
954 => *locked_at
955 );
956 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
957 event: _,
958 created_at: executed_at,
959 backtrace_id: None,
960 } if *executed_at >= locked_at);
961 execution_log
962 }
963
964 #[expect(clippy::too_many_lines)]
965 #[tokio::test]
966 async fn activity_trap_should_trigger_an_execution_retry() {
967 set_up();
968 let sim_clock = SimClock::default();
969 let (_guard, db_pool) = Database::Memory.set_up().await;
970 let exec_config = ExecConfig {
971 batch_size: 1,
972 lock_expiry: Duration::from_secs(1),
973 tick_sleep: Duration::ZERO,
974 component_id: ComponentId::dummy_activity(),
975 task_limiter: None,
976 };
977 let expected_reason = "error reason";
978 let expected_detail = "error detail";
979 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
980 WorkerError::ActivityTrap {
981 reason: expected_reason.to_string(),
982 trap_kind: concepts::TrapKind::Trap,
983 detail: expected_detail.to_string(),
984 version: Version::new(2),
985 http_client_traces: None,
986 },
987 )));
988 let retry_exp_backoff = Duration::from_millis(100);
989 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
990 let execution_log = create_and_tick(
991 CreateAndTickConfig {
992 execution_id: ExecutionId::generate(),
993 created_at: sim_clock.now(),
994 max_retries: 1,
995 executed_at: sim_clock.now(),
996 retry_exp_backoff,
997 },
998 sim_clock.clone(),
999 db_pool.clone(),
1000 exec_config.clone(),
1001 worker,
1002 tick_fn,
1003 )
1004 .await;
1005 assert_eq!(3, execution_log.events.len());
1006 {
1007 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1008 &execution_log.events.get(2).unwrap(),
1009 ExecutionEvent {
1010 event: ExecutionEventInner::TemporarilyFailed {
1011 reason_inner,
1012 reason_full,
1013 detail,
1014 backoff_expires_at,
1015 http_client_traces: None,
1016 },
1017 created_at: at,
1018 backtrace_id: None,
1019 }
1020 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1021 );
1022 assert_eq!(expected_reason, reason_inner.deref());
1023 assert_eq!(
1024 format!("activity trap: {expected_reason}"),
1025 reason_full.deref()
1026 );
1027 assert_eq!(Some(expected_detail), detail.as_deref());
1028 assert_eq!(at, sim_clock.now());
1029 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1030 }
1031 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1032 std::sync::Mutex::new(IndexMap::from([(
1033 Version::new(4),
1034 (
1035 vec![],
1036 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1037 ),
1038 )])),
1039 )));
1040 assert!(
1042 tick_fn(
1043 exec_config.clone(),
1044 sim_clock.clone(),
1045 db_pool.clone(),
1046 worker.clone(),
1047 sim_clock.now(),
1048 )
1049 .await
1050 .executions
1051 .is_empty()
1052 );
1053 sim_clock.move_time_forward(retry_exp_backoff).await;
1055 tick_fn(
1056 exec_config,
1057 sim_clock.clone(),
1058 db_pool.clone(),
1059 worker,
1060 sim_clock.now(),
1061 )
1062 .await;
1063 let execution_log = {
1064 let db_connection = db_pool.connection();
1065 db_connection
1066 .get(&execution_log.execution_id)
1067 .await
1068 .unwrap()
1069 };
1070 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1071 assert_matches!(
1072 execution_log.events.get(3).unwrap(),
1073 ExecutionEvent {
1074 event: ExecutionEventInner::Locked { .. },
1075 created_at: at,
1076 backtrace_id: None,
1077 } if *at == sim_clock.now()
1078 );
1079 assert_matches!(
1080 execution_log.events.get(4).unwrap(),
1081 ExecutionEvent {
1082 event: ExecutionEventInner::Finished {
1083 result: Ok(SupportedFunctionReturnValue::None),
1084 http_client_traces: None
1085 },
1086 created_at: finished_at,
1087 backtrace_id: None,
1088 } if *finished_at == sim_clock.now()
1089 );
1090 db_pool.close().await.unwrap();
1091 }
1092
1093 #[tokio::test]
1094 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1095 set_up();
1096 let created_at = Now.now();
1097 let clock_fn = ConstClock(created_at);
1098 let (_guard, db_pool) = Database::Memory.set_up().await;
1099 let exec_config = ExecConfig {
1100 batch_size: 1,
1101 lock_expiry: Duration::from_secs(1),
1102 tick_sleep: Duration::ZERO,
1103 component_id: ComponentId::dummy_activity(),
1104 task_limiter: None,
1105 };
1106
1107 let expected_reason = "error reason";
1108 let expected_detail = "error detail";
1109 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1110 WorkerError::ActivityTrap {
1111 reason: expected_reason.to_string(),
1112 trap_kind: concepts::TrapKind::Trap,
1113 detail: expected_detail.to_string(),
1114 version: Version::new(2),
1115 http_client_traces: None,
1116 },
1117 )));
1118 let execution_log = create_and_tick(
1119 CreateAndTickConfig {
1120 execution_id: ExecutionId::generate(),
1121 created_at,
1122 max_retries: 0,
1123 executed_at: created_at,
1124 retry_exp_backoff: Duration::ZERO,
1125 },
1126 clock_fn,
1127 db_pool.clone(),
1128 exec_config.clone(),
1129 worker,
1130 tick_fn,
1131 )
1132 .await;
1133 assert_eq!(3, execution_log.events.len());
1134 let (reason, kind, detail) = assert_matches!(
1135 &execution_log.events.get(2).unwrap(),
1136 ExecutionEvent {
1137 event: ExecutionEventInner::Finished{
1138 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1139 http_client_traces: None
1140 },
1141 created_at: at,
1142 backtrace_id: None,
1143 } if *at == created_at
1144 => (reason_inner, kind, detail)
1145 );
1146 assert_eq!(expected_reason, *reason);
1147 assert_eq!(Some(expected_detail), detail.as_deref());
1148 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1149
1150 db_pool.close().await.unwrap();
1151 }
1152
1153 #[tokio::test]
1154 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1155 let worker_error = WorkerError::ActivityTrap {
1156 reason: "error reason".to_string(),
1157 trap_kind: TrapKind::Trap,
1158 detail: "detail".to_string(),
1159 version: Version::new(2),
1160 http_client_traces: None,
1161 };
1162 let expected_child_err = FinishedExecutionError::PermanentFailure {
1163 reason_full: "activity trap: error reason".to_string(),
1164 reason_inner: "error reason".to_string(),
1165 kind: PermanentFailureKind::ActivityTrap,
1166 detail: Some("detail".to_string()),
1167 };
1168 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1169 .await;
1170 }
1171
1172 #[tokio::test]
1175 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1176 let worker_error = WorkerError::TemporaryTimeoutHandledByWatcher;
1177 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1178 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1179 .await;
1180 }
1181
1182 #[tokio::test]
1183 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1184 let parent_id = ExecutionId::from_parts(1, 1);
1185 let join_set_id_outer =
1186 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1187 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1188 let join_set_id_inner =
1189 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1190 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1191 let worker_error = WorkerError::FatalError(
1192 FatalError::UnhandledChildExecutionError {
1193 child_execution_id: child_execution_id.clone(),
1194 root_cause_id: root_cause_id.clone(),
1195 },
1196 Version::new(2),
1197 );
1198 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1199 child_execution_id,
1200 root_cause_id,
1201 };
1202 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1203 .await;
1204 }
1205
1206 #[expect(clippy::too_many_lines)]
1207 async fn child_execution_permanently_failed_should_notify_parent(
1208 worker_error: WorkerError,
1209 expected_child_err: FinishedExecutionError,
1210 ) {
1211 use concepts::storage::JoinSetResponseEventOuter;
1212 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1213
1214 set_up();
1215 let sim_clock = SimClock::default();
1216 let (_guard, db_pool) = Database::Memory.set_up().await;
1217
1218 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1219 WorkerResult::DbUpdatedByWorker,
1220 ));
1221 let parent_execution_id = ExecutionId::generate();
1222 db_pool
1223 .connection()
1224 .create(CreateRequest {
1225 created_at: sim_clock.now(),
1226 execution_id: parent_execution_id.clone(),
1227 ffqn: FFQN_SOME,
1228 params: Params::empty(),
1229 parent: None,
1230 metadata: concepts::ExecutionMetadata::empty(),
1231 scheduled_at: sim_clock.now(),
1232 retry_exp_backoff: Duration::ZERO,
1233 max_retries: 0,
1234 component_id: ComponentId::dummy_activity(),
1235 scheduled_by: None,
1236 })
1237 .await
1238 .unwrap();
1239 tick_fn(
1240 ExecConfig {
1241 batch_size: 1,
1242 lock_expiry: LOCK_EXPIRY,
1243 tick_sleep: Duration::ZERO,
1244 component_id: ComponentId::dummy_activity(),
1245 task_limiter: None,
1246 },
1247 sim_clock.clone(),
1248 db_pool.clone(),
1249 parent_worker,
1250 sim_clock.now(),
1251 )
1252 .await;
1253
1254 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1255 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1256 {
1258 let child = CreateRequest {
1259 created_at: sim_clock.now(),
1260 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1261 ffqn: FFQN_CHILD,
1262 params: Params::empty(),
1263 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1264 metadata: concepts::ExecutionMetadata::empty(),
1265 scheduled_at: sim_clock.now(),
1266 retry_exp_backoff: Duration::ZERO,
1267 max_retries: 0,
1268 component_id: ComponentId::dummy_activity(),
1269 scheduled_by: None,
1270 };
1271 let current_time = sim_clock.now();
1272 let join_set = AppendRequest {
1273 created_at: current_time,
1274 event: ExecutionEventInner::HistoryEvent {
1275 event: HistoryEvent::JoinSetCreate {
1276 join_set_id: join_set_id.clone(),
1277 closing_strategy: ClosingStrategy::Complete,
1278 },
1279 },
1280 };
1281 let child_exec_req = AppendRequest {
1282 created_at: current_time,
1283 event: ExecutionEventInner::HistoryEvent {
1284 event: HistoryEvent::JoinSetRequest {
1285 join_set_id: join_set_id.clone(),
1286 request: JoinSetRequest::ChildExecutionRequest {
1287 child_execution_id: child_execution_id.clone(),
1288 },
1289 },
1290 },
1291 };
1292 let join_next = AppendRequest {
1293 created_at: current_time,
1294 event: ExecutionEventInner::HistoryEvent {
1295 event: HistoryEvent::JoinNext {
1296 join_set_id: join_set_id.clone(),
1297 run_expires_at: sim_clock.now(),
1298 closing: false,
1299 },
1300 },
1301 };
1302 db_pool
1303 .connection()
1304 .append_batch_create_new_execution(
1305 current_time,
1306 vec![join_set, child_exec_req, join_next],
1307 parent_execution_id.clone(),
1308 Version::new(2),
1309 vec![child],
1310 )
1311 .await
1312 .unwrap();
1313 }
1314
1315 let child_worker = Arc::new(
1316 SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1317 );
1318
1319 tick_fn(
1321 ExecConfig {
1322 batch_size: 1,
1323 lock_expiry: LOCK_EXPIRY,
1324 tick_sleep: Duration::ZERO,
1325 component_id: ComponentId::dummy_activity(),
1326 task_limiter: None,
1327 },
1328 sim_clock.clone(),
1329 db_pool.clone(),
1330 child_worker,
1331 sim_clock.now(),
1332 )
1333 .await;
1334 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1335 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1337 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1338 .await
1339 .unwrap();
1340 }
1341 let child_log = db_pool
1342 .connection()
1343 .get(&ExecutionId::Derived(child_execution_id.clone()))
1344 .await
1345 .unwrap();
1346 assert!(child_log.pending_state.is_finished());
1347 assert_eq!(
1348 Version(2),
1349 child_log.next_version,
1350 "created = 0, locked = 1, with_single_result = 2"
1351 );
1352 assert_eq!(
1353 ExecutionEventInner::Finished {
1354 result: Err(expected_child_err),
1355 http_client_traces: None
1356 },
1357 child_log.last_event().event
1358 );
1359 let parent_log = db_pool
1360 .connection()
1361 .get(&parent_execution_id)
1362 .await
1363 .unwrap();
1364 assert_matches!(
1365 parent_log.pending_state,
1366 PendingState::PendingAt {
1367 scheduled_at
1368 } if scheduled_at == sim_clock.now(),
1369 "parent should be back to pending"
1370 );
1371 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1372 parent_log.responses.last(),
1373 Some(JoinSetResponseEventOuter{
1374 created_at: at,
1375 event: JoinSetResponseEvent{
1376 join_set_id: found_join_set_id,
1377 event: JoinSetResponse::ChildExecutionFinished {
1378 child_execution_id: found_child_execution_id,
1379 finished_version,
1380 result: found_result,
1381 }
1382 }
1383 })
1384 if *at == sim_clock.now()
1385 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1386 );
1387 assert_eq!(join_set_id, *found_join_set_id);
1388 assert_eq!(child_execution_id, *found_child_execution_id);
1389 assert_eq!(child_log.next_version, *child_finished_version);
1390 assert!(found_result.is_err());
1391
1392 db_pool.close().await.unwrap();
1393 }
1394
1395 #[derive(Clone, Debug)]
1396 struct SleepyWorker {
1397 duration: Duration,
1398 result: SupportedFunctionReturnValue,
1399 exported: [FunctionMetadata; 1],
1400 }
1401
1402 #[async_trait]
1403 impl Worker for SleepyWorker {
1404 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1405 tokio::time::sleep(self.duration).await;
1406 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1407 }
1408
1409 fn exported_functions(&self) -> &[FunctionMetadata] {
1410 &self.exported
1411 }
1412
1413 fn imported_functions(&self) -> &[FunctionMetadata] {
1414 &[]
1415 }
1416 }
1417
1418 #[expect(clippy::too_many_lines)]
1419 #[tokio::test]
1420 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1421 set_up();
1422 let sim_clock = SimClock::default();
1423 let (_guard, db_pool) = Database::Memory.set_up().await;
1424 let lock_expiry = Duration::from_millis(100);
1425 let exec_config = ExecConfig {
1426 batch_size: 1,
1427 lock_expiry,
1428 tick_sleep: Duration::ZERO,
1429 component_id: ComponentId::dummy_activity(),
1430 task_limiter: None,
1431 };
1432
1433 let worker = Arc::new(SleepyWorker {
1434 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1436 exported: [FunctionMetadata {
1437 ffqn: FFQN_SOME,
1438 parameter_types: ParameterTypes::default(),
1439 return_type: None,
1440 extension: None,
1441 submittable: true,
1442 }],
1443 });
1444 let execution_id = ExecutionId::generate();
1446 let timeout_duration = Duration::from_millis(300);
1447 let db_connection = db_pool.connection();
1448 db_connection
1449 .create(CreateRequest {
1450 created_at: sim_clock.now(),
1451 execution_id: execution_id.clone(),
1452 ffqn: FFQN_SOME,
1453 params: Params::empty(),
1454 parent: None,
1455 metadata: concepts::ExecutionMetadata::empty(),
1456 scheduled_at: sim_clock.now(),
1457 retry_exp_backoff: timeout_duration,
1458 max_retries: 1,
1459 component_id: ComponentId::dummy_activity(),
1460 scheduled_by: None,
1461 })
1462 .await
1463 .unwrap();
1464
1465 let ffqns = super::extract_ffqns(worker.as_ref());
1466 let executor = ExecTask::new(
1467 worker,
1468 exec_config,
1469 sim_clock.clone(),
1470 db_pool.clone(),
1471 ffqns,
1472 );
1473 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1474 assert_eq!(1, first_execution_progress.executions.len());
1475 sim_clock.move_time_forward(lock_expiry).await;
1477 let now_after_first_lock_expiry = sim_clock.now();
1479 {
1480 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1481 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1482 assert!(cleanup_progress.executions.is_empty());
1483 }
1484 {
1485 let expired_locks =
1486 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1487 .await
1488 .unwrap()
1489 .expired_locks;
1490 assert_eq!(1, expired_locks);
1491 }
1492 assert!(
1493 !first_execution_progress
1494 .executions
1495 .pop()
1496 .unwrap()
1497 .1
1498 .is_finished()
1499 );
1500
1501 let execution_log = db_connection.get(&execution_id).await.unwrap();
1502 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1503 assert_matches!(
1504 &execution_log.events.get(2).unwrap(),
1505 ExecutionEvent {
1506 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1507 created_at: at,
1508 backtrace_id: None,
1509 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1510 );
1511 assert_eq!(
1512 PendingState::PendingAt {
1513 scheduled_at: expected_first_timeout_expiry
1514 },
1515 execution_log.pending_state
1516 );
1517 sim_clock.move_time_forward(timeout_duration).await;
1518 let now_after_first_timeout = sim_clock.now();
1519 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1520
1521 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1522 assert_eq!(1, second_execution_progress.executions.len());
1523
1524 sim_clock.move_time_forward(lock_expiry).await;
1526 let now_after_second_lock_expiry = sim_clock.now();
1528 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1529 {
1530 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1531 assert!(cleanup_progress.executions.is_empty());
1532 }
1533 {
1534 let expired_locks =
1535 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1536 .await
1537 .unwrap()
1538 .expired_locks;
1539 assert_eq!(1, expired_locks);
1540 }
1541 assert!(
1542 !second_execution_progress
1543 .executions
1544 .pop()
1545 .unwrap()
1546 .1
1547 .is_finished()
1548 );
1549
1550 drop(db_connection);
1551 drop(executor);
1552 db_pool.close().await.unwrap();
1553 }
1554}