1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12 FinishedExecutionError,
13 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_ffqns(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 "otel.name" = format!("worker {}", locked_execution.ffqn),
240 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241 locked_execution.metadata.enrich(&worker_span);
242 tokio::spawn({
243 let worker_span2 = worker_span.clone();
244 async move {
245 let res = Self::run_worker(
246 worker,
247 &db_pool,
248 execution_deadline,
249 clock_fn,
250 locked_execution,
251 worker_span2,
252 )
253 .await;
254 if let Err(db_error) = res {
255 error!("Execution will be timed out not writing `{db_error:?}`");
256 }
257 drop(permit);
258 }
259 .instrument(worker_span)
260 })
261 };
262 executions.push((execution_id, join_handle));
263 }
264 Ok(ExecutionProgress { executions })
265 }
266
267 async fn run_worker(
268 worker: Arc<dyn Worker>,
269 db_pool: &P,
270 execution_deadline: DateTime<Utc>,
271 clock_fn: C,
272 locked_execution: LockedExecution,
273 worker_span: Span,
274 ) -> Result<(), DbError> {
275 debug!("Worker::run starting");
276 trace!(
277 version = %locked_execution.version,
278 params = ?locked_execution.params,
279 event_history = ?locked_execution.event_history,
280 "Worker::run starting"
281 );
282 let can_be_retried = ExecutionLog::can_be_retried_after(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.max_retries,
285 locked_execution.retry_exp_backoff,
286 );
287 let unlock_expiry_on_limit_reached =
288 ExecutionLog::compute_retry_duration_when_retrying_forever(
289 locked_execution.temporary_event_count + 1,
290 locked_execution.retry_exp_backoff,
291 );
292 let ctx = WorkerContext {
293 execution_id: locked_execution.execution_id.clone(),
294 metadata: locked_execution.metadata,
295 ffqn: locked_execution.ffqn,
296 params: locked_execution.params,
297 event_history: locked_execution.event_history,
298 responses: locked_execution
299 .responses
300 .into_iter()
301 .map(|outer| outer.event)
302 .collect(),
303 version: locked_execution.version,
304 execution_deadline,
305 can_be_retried: can_be_retried.is_some(),
306 run_id: locked_execution.run_id,
307 worker_span,
308 };
309 let worker_result = worker.run(ctx).await;
310 trace!(?worker_result, "Worker::run finished");
311 let result_obtained_at = clock_fn.now();
312 match Self::worker_result_to_execution_event(
313 locked_execution.execution_id,
314 worker_result,
315 result_obtained_at,
316 locked_execution.parent,
317 can_be_retried,
318 unlock_expiry_on_limit_reached,
319 )? {
320 Some(append) => {
321 let db_connection = db_pool.connection();
322 trace!("Appending {append:?}");
323 append.clone().append(&db_connection).await
324 }
325 None => Ok(()),
326 }
327 }
328
329 #[expect(clippy::too_many_lines)]
332 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version, http_client_traces) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished {
357 result: Ok(result),
358 http_client_traces,
359 },
360 };
361
362 Some(Append {
363 created_at: result_obtained_at,
364 primary_event,
365 execution_id,
366 version: new_version,
367 child_finished,
368 })
369 }
370 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371 WorkerResult::Err(err) => {
372 let reason = err.to_string();
373 let (primary_event, child_finished, version) = match err {
374 WorkerError::TemporaryTimeout {
375 http_client_traces,
376 version,
377 } => {
378 if let Some(duration) = can_be_retried {
379 let backoff_expires_at = result_obtained_at + duration;
380 info!("Temporary timeout after {duration:?} at {backoff_expires_at}");
381 (
382 ExecutionEventInner::TemporarilyTimedOut {
383 backoff_expires_at,
384 http_client_traces,
385 },
386 None,
387 version,
388 )
389 } else {
390 info!("Permanent timeout");
391 let result = Err(FinishedExecutionError::PermanentTimeout);
392 let child_finished =
393 parent.map(|(parent_execution_id, parent_join_set)| {
394 ChildFinishedResponse {
395 parent_execution_id,
396 parent_join_set,
397 result: result.clone(),
398 }
399 });
400 (
401 ExecutionEventInner::Finished {
402 result,
403 http_client_traces,
404 },
405 child_finished,
406 version,
407 )
408 }
409 }
410 WorkerError::DbError(db_error) => {
411 return Err(db_error);
412 }
413 WorkerError::ActivityTrap {
414 reason: reason_inner,
415 trap_kind,
416 detail,
417 version,
418 http_client_traces,
419 } => {
420 if let Some(duration) = can_be_retried {
421 let expires_at = result_obtained_at + duration;
422 debug!(
423 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
424 );
425 (
426 ExecutionEventInner::TemporarilyFailed {
427 backoff_expires_at: expires_at,
428 reason_full: StrVariant::from(reason),
429 reason_inner: StrVariant::from(reason_inner),
430 detail: Some(detail),
431 http_client_traces,
432 },
433 None,
434 version,
435 )
436 } else {
437 info!(
438 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
439 );
440 let result = Err(FinishedExecutionError::PermanentFailure {
441 reason_inner,
442 reason_full: reason,
443 kind: PermanentFailureKind::ActivityTrap,
444 detail: Some(detail),
445 });
446 let child_finished =
447 parent.map(|(parent_execution_id, parent_join_set)| {
448 ChildFinishedResponse {
449 parent_execution_id,
450 parent_join_set,
451 result: result.clone(),
452 }
453 });
454 (
455 ExecutionEventInner::Finished {
456 result,
457 http_client_traces,
458 },
459 child_finished,
460 version,
461 )
462 }
463 }
464 WorkerError::ActivityReturnedError {
465 detail,
466 version,
467 http_client_traces,
468 } => {
469 let duration = can_be_retried.expect(
470 "ActivityReturnedError must not be returned when retries are exhausted",
471 );
472 let expires_at = result_obtained_at + duration;
473 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
474 (
475 ExecutionEventInner::TemporarilyFailed {
476 backoff_expires_at: expires_at,
477 reason_full: StrVariant::Static("activity returned error"),
478 reason_inner: StrVariant::Static("activity returned error"),
479 detail,
480 http_client_traces,
481 },
482 None,
483 version,
484 )
485 }
486 WorkerError::TemporaryWorkflowTrap {
487 reason: reason_inner,
488 kind,
489 detail,
490 version,
491 } => {
492 let duration = can_be_retried.expect("workflows are retried forever");
493 let expires_at = result_obtained_at + duration;
494 debug!(
495 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
496 );
497 (
498 ExecutionEventInner::TemporarilyFailed {
499 backoff_expires_at: expires_at,
500 reason_full: StrVariant::from(reason),
501 reason_inner: StrVariant::from(reason_inner),
502 detail,
503 http_client_traces: None,
504 },
505 None,
506 version,
507 )
508 }
509 WorkerError::LimitReached {
510 reason: inner_reason,
511 version: new_version,
512 } => {
513 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
514 warn!(
515 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
516 );
517 (
518 ExecutionEventInner::Unlocked {
519 backoff_expires_at: expires_at,
520 reason: StrVariant::from(reason),
521 },
522 None,
523 new_version,
524 )
525 }
526 WorkerError::FatalError(fatal_error, version) => {
527 info!("Fatal worker error - {fatal_error:?}");
528 let result = Err(FinishedExecutionError::from(fatal_error));
529 let child_finished =
530 parent.map(|(parent_execution_id, parent_join_set)| {
531 ChildFinishedResponse {
532 parent_execution_id,
533 parent_join_set,
534 result: result.clone(),
535 }
536 });
537 (
538 ExecutionEventInner::Finished {
539 result,
540 http_client_traces: None,
541 },
542 child_finished,
543 version,
544 )
545 }
546 };
547 Some(Append {
548 created_at: result_obtained_at,
549 primary_event: AppendRequest {
550 created_at: result_obtained_at,
551 event: primary_event,
552 },
553 execution_id,
554 version,
555 child_finished,
556 })
557 }
558 })
559 }
560}
561
562#[derive(Debug, Clone)]
563pub(crate) struct ChildFinishedResponse {
564 pub(crate) parent_execution_id: ExecutionId,
565 pub(crate) parent_join_set: JoinSetId,
566 pub(crate) result: FinishedExecutionResult,
567}
568
569#[derive(Debug, Clone)]
570pub(crate) struct Append {
571 pub(crate) created_at: DateTime<Utc>,
572 pub(crate) primary_event: AppendRequest,
573 pub(crate) execution_id: ExecutionId,
574 pub(crate) version: Version,
575 pub(crate) child_finished: Option<ChildFinishedResponse>,
576}
577
578impl Append {
579 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
580 if let Some(child_finished) = self.child_finished {
581 assert_matches!(
582 &self.primary_event,
583 AppendRequest {
584 event: ExecutionEventInner::Finished { .. },
585 ..
586 }
587 );
588 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
589 db_connection
590 .append_batch_respond_to_parent(
591 derived.clone(),
592 self.created_at,
593 vec![self.primary_event],
594 self.version.clone(),
595 child_finished.parent_execution_id,
596 JoinSetResponseEventOuter {
597 created_at: self.created_at,
598 event: JoinSetResponseEvent {
599 join_set_id: child_finished.parent_join_set,
600 event: JoinSetResponse::ChildExecutionFinished {
601 child_execution_id: derived,
602 finished_version: self.version,
604 result: child_finished.result,
605 },
606 },
607 },
608 )
609 .await?;
610 } else {
611 db_connection
612 .append_batch(
613 self.created_at,
614 vec![self.primary_event],
615 self.execution_id,
616 self.version,
617 )
618 .await?;
619 }
620 Ok(())
621 }
622}
623
624#[cfg(any(test, feature = "test"))]
625pub mod simple_worker {
626 use crate::worker::{Worker, WorkerContext, WorkerResult};
627 use async_trait::async_trait;
628 use concepts::{
629 FunctionFqn, FunctionMetadata, ParameterTypes,
630 storage::{HistoryEvent, Version},
631 };
632 use indexmap::IndexMap;
633 use std::sync::Arc;
634 use tracing::trace;
635
636 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
637 pub type SimpleWorkerResultMap =
638 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
639
640 #[derive(Clone, Debug)]
641 pub struct SimpleWorker {
642 pub worker_results_rev: SimpleWorkerResultMap,
643 pub ffqn: FunctionFqn,
644 exported: [FunctionMetadata; 1],
645 }
646
647 impl SimpleWorker {
648 #[must_use]
649 pub fn with_single_result(res: WorkerResult) -> Self {
650 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
651 Version::new(2),
652 (vec![], res),
653 )]))))
654 }
655
656 #[must_use]
657 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
658 Self {
659 worker_results_rev: self.worker_results_rev,
660 exported: [FunctionMetadata {
661 ffqn: ffqn.clone(),
662 parameter_types: ParameterTypes::default(),
663 return_type: None,
664 extension: None,
665 submittable: true,
666 }],
667 ffqn,
668 }
669 }
670
671 #[must_use]
672 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
673 Self {
674 worker_results_rev,
675 ffqn: FFQN_SOME,
676 exported: [FunctionMetadata {
677 ffqn: FFQN_SOME,
678 parameter_types: ParameterTypes::default(),
679 return_type: None,
680 extension: None,
681 submittable: true,
682 }],
683 }
684 }
685 }
686
687 #[async_trait]
688 impl Worker for SimpleWorker {
689 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
690 let (expected_version, (expected_eh, worker_result)) =
691 self.worker_results_rev.lock().unwrap().pop().unwrap();
692 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
693 assert_eq!(expected_version, ctx.version);
694 assert_eq!(expected_eh, ctx.event_history);
695 worker_result
696 }
697
698 fn exported_functions(&self) -> &[FunctionMetadata] {
699 &self.exported
700 }
701
702 fn imported_functions(&self) -> &[FunctionMetadata] {
703 &[]
704 }
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use self::simple_worker::SimpleWorker;
711 use super::*;
712 use crate::worker::FatalError;
713 use crate::{expired_timers_watcher, worker::WorkerResult};
714 use assert_matches::assert_matches;
715 use async_trait::async_trait;
716 use concepts::storage::{CreateRequest, JoinSetRequest};
717 use concepts::storage::{
718 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
719 };
720 use concepts::time::Now;
721 use concepts::{
722 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
723 SupportedFunctionReturnValue, TrapKind,
724 };
725 use db_tests::Database;
726 use indexmap::IndexMap;
727 use simple_worker::FFQN_SOME;
728 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
729 use test_utils::set_up;
730 use test_utils::sim_clock::{ConstClock, SimClock};
731
732 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
733
734 async fn tick_fn<
735 W: Worker + Debug,
736 C: ClockFn + 'static,
737 DB: DbConnection + 'static,
738 P: DbPool<DB> + 'static,
739 >(
740 config: ExecConfig,
741 clock_fn: C,
742 db_pool: P,
743 worker: Arc<W>,
744 executed_at: DateTime<Utc>,
745 ) -> ExecutionProgress {
746 trace!("Ticking with {worker:?}");
747 let ffqns = super::extract_ffqns(worker.as_ref());
748 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
749 let mut execution_progress = executor.tick(executed_at).await.unwrap();
750 loop {
751 execution_progress
752 .executions
753 .retain(|(_, abort_handle)| !abort_handle.is_finished());
754 if execution_progress.executions.is_empty() {
755 return execution_progress;
756 }
757 tokio::time::sleep(Duration::from_millis(10)).await;
758 }
759 }
760
761 #[tokio::test]
762 async fn execute_simple_lifecycle_tick_based_mem() {
763 let created_at = Now.now();
764 let (_guard, db_pool) = Database::Memory.set_up().await;
765 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
766 db_pool.close().await.unwrap();
767 }
768
769 #[cfg(not(madsim))]
770 #[tokio::test]
771 async fn execute_simple_lifecycle_tick_based_sqlite() {
772 let created_at = Now.now();
773 let (_guard, db_pool) = Database::Sqlite.set_up().await;
774 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
775 db_pool.close().await.unwrap();
776 }
777
778 async fn execute_simple_lifecycle_tick_based<
779 DB: DbConnection + 'static,
780 P: DbPool<DB> + 'static,
781 C: ClockFn + 'static,
782 >(
783 pool: P,
784 clock_fn: C,
785 ) {
786 set_up();
787 let created_at = clock_fn.now();
788 let exec_config = ExecConfig {
789 batch_size: 1,
790 lock_expiry: Duration::from_secs(1),
791 tick_sleep: Duration::from_millis(100),
792 component_id: ComponentId::dummy_activity(),
793 task_limiter: None,
794 };
795
796 let execution_log = create_and_tick(
797 CreateAndTickConfig {
798 execution_id: ExecutionId::generate(),
799 created_at,
800 max_retries: 0,
801 executed_at: created_at,
802 retry_exp_backoff: Duration::ZERO,
803 },
804 clock_fn,
805 pool,
806 exec_config,
807 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
808 SupportedFunctionReturnValue::None,
809 Version::new(2),
810 None,
811 ))),
812 tick_fn,
813 )
814 .await;
815 assert_matches!(
816 execution_log.events.get(2).unwrap(),
817 ExecutionEvent {
818 event: ExecutionEventInner::Finished {
819 result: Ok(SupportedFunctionReturnValue::None),
820 http_client_traces: None
821 },
822 created_at: _,
823 backtrace_id: None,
824 }
825 );
826 }
827
828 #[tokio::test]
829 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
830 set_up();
831 let created_at = Now.now();
832 let clock_fn = ConstClock(created_at);
833 let (_guard, db_pool) = Database::Memory.set_up().await;
834 let exec_config = ExecConfig {
835 batch_size: 1,
836 lock_expiry: Duration::from_secs(1),
837 tick_sleep: Duration::ZERO,
838 component_id: ComponentId::dummy_activity(),
839 task_limiter: None,
840 };
841
842 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
843 SupportedFunctionReturnValue::None,
844 Version::new(2),
845 None,
846 )));
847 let exec_task = ExecTask::spawn_new(
848 worker.clone(),
849 exec_config.clone(),
850 clock_fn,
851 db_pool.clone(),
852 ExecutorId::generate(),
853 );
854
855 let execution_log = create_and_tick(
856 CreateAndTickConfig {
857 execution_id: ExecutionId::generate(),
858 created_at,
859 max_retries: 0,
860 executed_at: created_at,
861 retry_exp_backoff: Duration::ZERO,
862 },
863 clock_fn,
864 db_pool.clone(),
865 exec_config,
866 worker,
867 |_, _, _, _, _| async {
868 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
870 },
871 )
872 .await;
873 exec_task.close().await;
874 db_pool.close().await.unwrap();
875 assert_matches!(
876 execution_log.events.get(2).unwrap(),
877 ExecutionEvent {
878 event: ExecutionEventInner::Finished {
879 result: Ok(SupportedFunctionReturnValue::None),
880 http_client_traces: None
881 },
882 created_at: _,
883 backtrace_id: None,
884 }
885 );
886 }
887
888 struct CreateAndTickConfig {
889 execution_id: ExecutionId,
890 created_at: DateTime<Utc>,
891 max_retries: u32,
892 executed_at: DateTime<Utc>,
893 retry_exp_backoff: Duration,
894 }
895
896 async fn create_and_tick<
897 W: Worker,
898 C: ClockFn,
899 DB: DbConnection,
900 P: DbPool<DB>,
901 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
902 F: Future<Output = ExecutionProgress>,
903 >(
904 config: CreateAndTickConfig,
905 clock_fn: C,
906 db_pool: P,
907 exec_config: ExecConfig,
908 worker: Arc<W>,
909 mut tick: T,
910 ) -> ExecutionLog {
911 let db_connection = db_pool.connection();
913 db_connection
914 .create(CreateRequest {
915 created_at: config.created_at,
916 execution_id: config.execution_id.clone(),
917 ffqn: FFQN_SOME,
918 params: Params::empty(),
919 parent: None,
920 metadata: concepts::ExecutionMetadata::empty(),
921 scheduled_at: config.created_at,
922 retry_exp_backoff: config.retry_exp_backoff,
923 max_retries: config.max_retries,
924 component_id: ComponentId::dummy_activity(),
925 scheduled_by: None,
926 })
927 .await
928 .unwrap();
929 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
931 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
932 debug!("Execution history after tick: {execution_log:?}");
933 assert_matches!(
935 execution_log.events.first().unwrap(),
936 ExecutionEvent {
937 event: ExecutionEventInner::Created { .. },
938 created_at: actually_created_at,
939 backtrace_id: None,
940 }
941 if config.created_at == *actually_created_at
942 );
943 let locked_at = assert_matches!(
944 execution_log.events.get(1).unwrap(),
945 ExecutionEvent {
946 event: ExecutionEventInner::Locked { .. },
947 created_at: locked_at,
948 backtrace_id: None,
949 } if config.created_at <= *locked_at
950 => *locked_at
951 );
952 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
953 event: _,
954 created_at: executed_at,
955 backtrace_id: None,
956 } if *executed_at >= locked_at);
957 execution_log
958 }
959
960 #[expect(clippy::too_many_lines)]
961 #[tokio::test]
962 async fn activity_trap_should_trigger_an_execution_retry() {
963 set_up();
964 let sim_clock = SimClock::default();
965 let (_guard, db_pool) = Database::Memory.set_up().await;
966 let exec_config = ExecConfig {
967 batch_size: 1,
968 lock_expiry: Duration::from_secs(1),
969 tick_sleep: Duration::ZERO,
970 component_id: ComponentId::dummy_activity(),
971 task_limiter: None,
972 };
973 let expected_reason = "error reason";
974 let expected_detail = "error detail";
975 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
976 WorkerError::ActivityTrap {
977 reason: expected_reason.to_string(),
978 trap_kind: concepts::TrapKind::Trap,
979 detail: expected_detail.to_string(),
980 version: Version::new(2),
981 http_client_traces: None,
982 },
983 )));
984 let retry_exp_backoff = Duration::from_millis(100);
985 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
986 let execution_log = create_and_tick(
987 CreateAndTickConfig {
988 execution_id: ExecutionId::generate(),
989 created_at: sim_clock.now(),
990 max_retries: 1,
991 executed_at: sim_clock.now(),
992 retry_exp_backoff,
993 },
994 sim_clock.clone(),
995 db_pool.clone(),
996 exec_config.clone(),
997 worker,
998 tick_fn,
999 )
1000 .await;
1001 assert_eq!(3, execution_log.events.len());
1002 {
1003 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1004 &execution_log.events.get(2).unwrap(),
1005 ExecutionEvent {
1006 event: ExecutionEventInner::TemporarilyFailed {
1007 reason_inner,
1008 reason_full,
1009 detail,
1010 backoff_expires_at,
1011 http_client_traces: None,
1012 },
1013 created_at: at,
1014 backtrace_id: None,
1015 }
1016 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1017 );
1018 assert_eq!(expected_reason, reason_inner.deref());
1019 assert_eq!(
1020 format!("activity trap: {expected_reason}"),
1021 reason_full.deref()
1022 );
1023 assert_eq!(Some(expected_detail), detail.as_deref());
1024 assert_eq!(at, sim_clock.now());
1025 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1026 }
1027 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1028 std::sync::Mutex::new(IndexMap::from([(
1029 Version::new(4),
1030 (
1031 vec![],
1032 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1033 ),
1034 )])),
1035 )));
1036 assert!(
1038 tick_fn(
1039 exec_config.clone(),
1040 sim_clock.clone(),
1041 db_pool.clone(),
1042 worker.clone(),
1043 sim_clock.now(),
1044 )
1045 .await
1046 .executions
1047 .is_empty()
1048 );
1049 sim_clock.move_time_forward(retry_exp_backoff).await;
1051 tick_fn(
1052 exec_config,
1053 sim_clock.clone(),
1054 db_pool.clone(),
1055 worker,
1056 sim_clock.now(),
1057 )
1058 .await;
1059 let execution_log = {
1060 let db_connection = db_pool.connection();
1061 db_connection
1062 .get(&execution_log.execution_id)
1063 .await
1064 .unwrap()
1065 };
1066 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1067 assert_matches!(
1068 execution_log.events.get(3).unwrap(),
1069 ExecutionEvent {
1070 event: ExecutionEventInner::Locked { .. },
1071 created_at: at,
1072 backtrace_id: None,
1073 } if *at == sim_clock.now()
1074 );
1075 assert_matches!(
1076 execution_log.events.get(4).unwrap(),
1077 ExecutionEvent {
1078 event: ExecutionEventInner::Finished {
1079 result: Ok(SupportedFunctionReturnValue::None),
1080 http_client_traces: None
1081 },
1082 created_at: finished_at,
1083 backtrace_id: None,
1084 } if *finished_at == sim_clock.now()
1085 );
1086 db_pool.close().await.unwrap();
1087 }
1088
1089 #[tokio::test]
1090 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1091 set_up();
1092 let created_at = Now.now();
1093 let clock_fn = ConstClock(created_at);
1094 let (_guard, db_pool) = Database::Memory.set_up().await;
1095 let exec_config = ExecConfig {
1096 batch_size: 1,
1097 lock_expiry: Duration::from_secs(1),
1098 tick_sleep: Duration::ZERO,
1099 component_id: ComponentId::dummy_activity(),
1100 task_limiter: None,
1101 };
1102
1103 let expected_reason = "error reason";
1104 let expected_detail = "error detail";
1105 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1106 WorkerError::ActivityTrap {
1107 reason: expected_reason.to_string(),
1108 trap_kind: concepts::TrapKind::Trap,
1109 detail: expected_detail.to_string(),
1110 version: Version::new(2),
1111 http_client_traces: None,
1112 },
1113 )));
1114 let execution_log = create_and_tick(
1115 CreateAndTickConfig {
1116 execution_id: ExecutionId::generate(),
1117 created_at,
1118 max_retries: 0,
1119 executed_at: created_at,
1120 retry_exp_backoff: Duration::ZERO,
1121 },
1122 clock_fn,
1123 db_pool.clone(),
1124 exec_config.clone(),
1125 worker,
1126 tick_fn,
1127 )
1128 .await;
1129 assert_eq!(3, execution_log.events.len());
1130 let (reason, kind, detail) = assert_matches!(
1131 &execution_log.events.get(2).unwrap(),
1132 ExecutionEvent {
1133 event: ExecutionEventInner::Finished{
1134 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1135 http_client_traces: None
1136 },
1137 created_at: at,
1138 backtrace_id: None,
1139 } if *at == created_at
1140 => (reason_inner, kind, detail)
1141 );
1142 assert_eq!(expected_reason, *reason);
1143 assert_eq!(Some(expected_detail), detail.as_deref());
1144 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1145
1146 db_pool.close().await.unwrap();
1147 }
1148
1149 #[tokio::test]
1150 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1151 let worker_error = WorkerError::ActivityTrap {
1152 reason: "error reason".to_string(),
1153 trap_kind: TrapKind::Trap,
1154 detail: "detail".to_string(),
1155 version: Version::new(2),
1156 http_client_traces: None,
1157 };
1158 let expected_child_err = FinishedExecutionError::PermanentFailure {
1159 reason_full: "activity trap: error reason".to_string(),
1160 reason_inner: "error reason".to_string(),
1161 kind: PermanentFailureKind::ActivityTrap,
1162 detail: Some("detail".to_string()),
1163 };
1164 child_execution_permanently_failed_should_notify_parent(
1165 WorkerResult::Err(worker_error),
1166 expected_child_err,
1167 )
1168 .await;
1169 }
1170
1171 #[tokio::test]
1174 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1175 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1176 child_execution_permanently_failed_should_notify_parent(
1177 WorkerResult::DbUpdatedByWorkerOrWatcher,
1178 expected_child_err,
1179 )
1180 .await;
1181 }
1182
1183 #[tokio::test]
1184 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1185 let parent_id = ExecutionId::from_parts(1, 1);
1186 let join_set_id_outer =
1187 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1188 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1189 let join_set_id_inner =
1190 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1191 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1192 let worker_error = WorkerError::FatalError(
1193 FatalError::UnhandledChildExecutionError {
1194 child_execution_id: child_execution_id.clone(),
1195 root_cause_id: root_cause_id.clone(),
1196 },
1197 Version::new(2),
1198 );
1199 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1200 child_execution_id,
1201 root_cause_id,
1202 };
1203 child_execution_permanently_failed_should_notify_parent(
1204 WorkerResult::Err(worker_error),
1205 expected_child_err,
1206 )
1207 .await;
1208 }
1209
1210 #[expect(clippy::too_many_lines)]
1211 async fn child_execution_permanently_failed_should_notify_parent(
1212 worker_result: WorkerResult,
1213 expected_child_err: FinishedExecutionError,
1214 ) {
1215 use concepts::storage::JoinSetResponseEventOuter;
1216 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1217
1218 set_up();
1219 let sim_clock = SimClock::default();
1220 let (_guard, db_pool) = Database::Memory.set_up().await;
1221
1222 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1223 WorkerResult::DbUpdatedByWorkerOrWatcher,
1224 ));
1225 let parent_execution_id = ExecutionId::generate();
1226 db_pool
1227 .connection()
1228 .create(CreateRequest {
1229 created_at: sim_clock.now(),
1230 execution_id: parent_execution_id.clone(),
1231 ffqn: FFQN_SOME,
1232 params: Params::empty(),
1233 parent: None,
1234 metadata: concepts::ExecutionMetadata::empty(),
1235 scheduled_at: sim_clock.now(),
1236 retry_exp_backoff: Duration::ZERO,
1237 max_retries: 0,
1238 component_id: ComponentId::dummy_activity(),
1239 scheduled_by: None,
1240 })
1241 .await
1242 .unwrap();
1243 tick_fn(
1244 ExecConfig {
1245 batch_size: 1,
1246 lock_expiry: LOCK_EXPIRY,
1247 tick_sleep: Duration::ZERO,
1248 component_id: ComponentId::dummy_activity(),
1249 task_limiter: None,
1250 },
1251 sim_clock.clone(),
1252 db_pool.clone(),
1253 parent_worker,
1254 sim_clock.now(),
1255 )
1256 .await;
1257
1258 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1259 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1260 {
1262 let child = CreateRequest {
1263 created_at: sim_clock.now(),
1264 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1265 ffqn: FFQN_CHILD,
1266 params: Params::empty(),
1267 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1268 metadata: concepts::ExecutionMetadata::empty(),
1269 scheduled_at: sim_clock.now(),
1270 retry_exp_backoff: Duration::ZERO,
1271 max_retries: 0,
1272 component_id: ComponentId::dummy_activity(),
1273 scheduled_by: None,
1274 };
1275 let current_time = sim_clock.now();
1276 let join_set = AppendRequest {
1277 created_at: current_time,
1278 event: ExecutionEventInner::HistoryEvent {
1279 event: HistoryEvent::JoinSetCreate {
1280 join_set_id: join_set_id.clone(),
1281 closing_strategy: ClosingStrategy::Complete,
1282 },
1283 },
1284 };
1285 let child_exec_req = AppendRequest {
1286 created_at: current_time,
1287 event: ExecutionEventInner::HistoryEvent {
1288 event: HistoryEvent::JoinSetRequest {
1289 join_set_id: join_set_id.clone(),
1290 request: JoinSetRequest::ChildExecutionRequest {
1291 child_execution_id: child_execution_id.clone(),
1292 },
1293 },
1294 },
1295 };
1296 let join_next = AppendRequest {
1297 created_at: current_time,
1298 event: ExecutionEventInner::HistoryEvent {
1299 event: HistoryEvent::JoinNext {
1300 join_set_id: join_set_id.clone(),
1301 run_expires_at: sim_clock.now(),
1302 closing: false,
1303 },
1304 },
1305 };
1306 db_pool
1307 .connection()
1308 .append_batch_create_new_execution(
1309 current_time,
1310 vec![join_set, child_exec_req, join_next],
1311 parent_execution_id.clone(),
1312 Version::new(2),
1313 vec![child],
1314 )
1315 .await
1316 .unwrap();
1317 }
1318
1319 let child_worker =
1320 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1321
1322 tick_fn(
1324 ExecConfig {
1325 batch_size: 1,
1326 lock_expiry: LOCK_EXPIRY,
1327 tick_sleep: Duration::ZERO,
1328 component_id: ComponentId::dummy_activity(),
1329 task_limiter: None,
1330 },
1331 sim_clock.clone(),
1332 db_pool.clone(),
1333 child_worker,
1334 sim_clock.now(),
1335 )
1336 .await;
1337 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1338 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1340 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1341 .await
1342 .unwrap();
1343 }
1344 let child_log = db_pool
1345 .connection()
1346 .get(&ExecutionId::Derived(child_execution_id.clone()))
1347 .await
1348 .unwrap();
1349 assert!(child_log.pending_state.is_finished());
1350 assert_eq!(
1351 Version(2),
1352 child_log.next_version,
1353 "created = 0, locked = 1, with_single_result = 2"
1354 );
1355 assert_eq!(
1356 ExecutionEventInner::Finished {
1357 result: Err(expected_child_err),
1358 http_client_traces: None
1359 },
1360 child_log.last_event().event
1361 );
1362 let parent_log = db_pool
1363 .connection()
1364 .get(&parent_execution_id)
1365 .await
1366 .unwrap();
1367 assert_matches!(
1368 parent_log.pending_state,
1369 PendingState::PendingAt {
1370 scheduled_at
1371 } if scheduled_at == sim_clock.now(),
1372 "parent should be back to pending"
1373 );
1374 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1375 parent_log.responses.last(),
1376 Some(JoinSetResponseEventOuter{
1377 created_at: at,
1378 event: JoinSetResponseEvent{
1379 join_set_id: found_join_set_id,
1380 event: JoinSetResponse::ChildExecutionFinished {
1381 child_execution_id: found_child_execution_id,
1382 finished_version,
1383 result: found_result,
1384 }
1385 }
1386 })
1387 if *at == sim_clock.now()
1388 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1389 );
1390 assert_eq!(join_set_id, *found_join_set_id);
1391 assert_eq!(child_execution_id, *found_child_execution_id);
1392 assert_eq!(child_log.next_version, *child_finished_version);
1393 assert!(found_result.is_err());
1394
1395 db_pool.close().await.unwrap();
1396 }
1397
1398 #[derive(Clone, Debug)]
1399 struct SleepyWorker {
1400 duration: Duration,
1401 result: SupportedFunctionReturnValue,
1402 exported: [FunctionMetadata; 1],
1403 }
1404
1405 #[async_trait]
1406 impl Worker for SleepyWorker {
1407 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1408 tokio::time::sleep(self.duration).await;
1409 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1410 }
1411
1412 fn exported_functions(&self) -> &[FunctionMetadata] {
1413 &self.exported
1414 }
1415
1416 fn imported_functions(&self) -> &[FunctionMetadata] {
1417 &[]
1418 }
1419 }
1420
1421 #[expect(clippy::too_many_lines)]
1422 #[tokio::test]
1423 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1424 set_up();
1425 let sim_clock = SimClock::default();
1426 let (_guard, db_pool) = Database::Memory.set_up().await;
1427 let lock_expiry = Duration::from_millis(100);
1428 let exec_config = ExecConfig {
1429 batch_size: 1,
1430 lock_expiry,
1431 tick_sleep: Duration::ZERO,
1432 component_id: ComponentId::dummy_activity(),
1433 task_limiter: None,
1434 };
1435
1436 let worker = Arc::new(SleepyWorker {
1437 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1439 exported: [FunctionMetadata {
1440 ffqn: FFQN_SOME,
1441 parameter_types: ParameterTypes::default(),
1442 return_type: None,
1443 extension: None,
1444 submittable: true,
1445 }],
1446 });
1447 let execution_id = ExecutionId::generate();
1449 let timeout_duration = Duration::from_millis(300);
1450 let db_connection = db_pool.connection();
1451 db_connection
1452 .create(CreateRequest {
1453 created_at: sim_clock.now(),
1454 execution_id: execution_id.clone(),
1455 ffqn: FFQN_SOME,
1456 params: Params::empty(),
1457 parent: None,
1458 metadata: concepts::ExecutionMetadata::empty(),
1459 scheduled_at: sim_clock.now(),
1460 retry_exp_backoff: timeout_duration,
1461 max_retries: 1,
1462 component_id: ComponentId::dummy_activity(),
1463 scheduled_by: None,
1464 })
1465 .await
1466 .unwrap();
1467
1468 let ffqns = super::extract_ffqns(worker.as_ref());
1469 let executor = ExecTask::new(
1470 worker,
1471 exec_config,
1472 sim_clock.clone(),
1473 db_pool.clone(),
1474 ffqns,
1475 );
1476 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1477 assert_eq!(1, first_execution_progress.executions.len());
1478 sim_clock.move_time_forward(lock_expiry).await;
1480 let now_after_first_lock_expiry = sim_clock.now();
1482 {
1483 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1484 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1485 assert!(cleanup_progress.executions.is_empty());
1486 }
1487 {
1488 let expired_locks =
1489 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1490 .await
1491 .unwrap()
1492 .expired_locks;
1493 assert_eq!(1, expired_locks);
1494 }
1495 assert!(
1496 !first_execution_progress
1497 .executions
1498 .pop()
1499 .unwrap()
1500 .1
1501 .is_finished()
1502 );
1503
1504 let execution_log = db_connection.get(&execution_id).await.unwrap();
1505 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1506 assert_matches!(
1507 &execution_log.events.get(2).unwrap(),
1508 ExecutionEvent {
1509 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1510 created_at: at,
1511 backtrace_id: None,
1512 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1513 );
1514 assert_eq!(
1515 PendingState::PendingAt {
1516 scheduled_at: expected_first_timeout_expiry
1517 },
1518 execution_log.pending_state
1519 );
1520 sim_clock.move_time_forward(timeout_duration).await;
1521 let now_after_first_timeout = sim_clock.now();
1522 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1523
1524 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1525 assert_eq!(1, second_execution_progress.executions.len());
1526
1527 sim_clock.move_time_forward(lock_expiry).await;
1529 let now_after_second_lock_expiry = sim_clock.now();
1531 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1532 {
1533 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1534 assert!(cleanup_progress.executions.is_empty());
1535 }
1536 {
1537 let expired_locks =
1538 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1539 .await
1540 .unwrap()
1541 .expired_locks;
1542 assert_eq!(1, expired_locks);
1543 }
1544 assert!(
1545 !second_execution_progress
1546 .executions
1547 .pop()
1548 .unwrap()
1549 .1
1550 .is_finished()
1551 );
1552
1553 drop(db_connection);
1554 drop(executor);
1555 db_pool.close().await.unwrap();
1556 }
1557}