1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
10use concepts::{
11 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
12 FinishedExecutionError,
13};
14use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_ffqns(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 "otel.name" = format!("worker {}", locked_execution.ffqn),
240 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241 locked_execution.metadata.enrich(&worker_span);
242 tokio::spawn({
243 let worker_span2 = worker_span.clone();
244 async move {
245 let res = Self::run_worker(
246 worker,
247 &db_pool,
248 execution_deadline,
249 clock_fn,
250 locked_execution,
251 worker_span2,
252 )
253 .await;
254 if let Err(db_error) = res {
255 error!("Execution will be timed out not writing `{db_error:?}`");
256 }
257 drop(permit);
258 }
259 .instrument(worker_span)
260 })
261 };
262 executions.push((execution_id, join_handle));
263 }
264 Ok(ExecutionProgress { executions })
265 }
266
267 async fn run_worker(
268 worker: Arc<dyn Worker>,
269 db_pool: &P,
270 execution_deadline: DateTime<Utc>,
271 clock_fn: C,
272 locked_execution: LockedExecution,
273 worker_span: Span,
274 ) -> Result<(), DbError> {
275 debug!("Worker::run starting");
276 trace!(
277 version = %locked_execution.version,
278 params = ?locked_execution.params,
279 event_history = ?locked_execution.event_history,
280 "Worker::run starting"
281 );
282 let can_be_retried = ExecutionLog::can_be_retried_after(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.max_retries,
285 locked_execution.retry_exp_backoff,
286 );
287 let unlock_expiry_on_limit_reached =
288 ExecutionLog::compute_retry_duration_when_retrying_forever(
289 locked_execution.temporary_event_count + 1,
290 locked_execution.retry_exp_backoff,
291 );
292 let ctx = WorkerContext {
293 execution_id: locked_execution.execution_id.clone(),
294 metadata: locked_execution.metadata,
295 ffqn: locked_execution.ffqn,
296 params: locked_execution.params,
297 event_history: locked_execution.event_history,
298 responses: locked_execution
299 .responses
300 .into_iter()
301 .map(|outer| outer.event)
302 .collect(),
303 version: locked_execution.version,
304 execution_deadline,
305 can_be_retried: can_be_retried.is_some(),
306 run_id: locked_execution.run_id,
307 worker_span,
308 };
309 let worker_result = worker.run(ctx).await;
310 trace!(?worker_result, "Worker::run finished");
311 let result_obtained_at = clock_fn.now();
312 match Self::worker_result_to_execution_event(
313 locked_execution.execution_id,
314 worker_result,
315 result_obtained_at,
316 locked_execution.parent,
317 can_be_retried,
318 unlock_expiry_on_limit_reached,
319 )? {
320 Some(append) => {
321 let db_connection = db_pool.connection();
322 trace!("Appending {append:?}");
323 append.clone().append(&db_connection).await
324 }
325 None => Ok(()),
326 }
327 }
328
329 #[expect(clippy::too_many_lines)]
332 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version, http_client_traces) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished {
357 result: Ok(result),
358 http_client_traces,
359 },
360 };
361
362 Some(Append {
363 created_at: result_obtained_at,
364 primary_event,
365 execution_id,
366 version: new_version,
367 child_finished,
368 })
369 }
370 WorkerResult::DbUpdatedByWorker => None,
371 WorkerResult::Err(err) => {
372 let reason = err.to_string();
373 let (primary_event, child_finished, version) = match err {
374 WorkerError::TemporaryTimeoutHandledByWatcher => {
375 info!("Temporary timeout handled by watcher");
376 return Ok(None);
377 }
378 WorkerError::TemporaryTimeout {
379 http_client_traces,
380 version,
381 } => {
382 if let Some(duration) = can_be_retried {
383 let backoff_expires_at = result_obtained_at + duration;
384 info!("Temporary timeout after {duration:?} at {backoff_expires_at}");
385 (
386 ExecutionEventInner::TemporarilyTimedOut {
387 backoff_expires_at,
388 http_client_traces,
389 },
390 None,
391 version,
392 )
393 } else {
394 info!("Permanent timeout");
395 let result = Err(FinishedExecutionError::PermanentTimeout);
396 let child_finished =
397 parent.map(|(parent_execution_id, parent_join_set)| {
398 ChildFinishedResponse {
399 parent_execution_id,
400 parent_join_set,
401 result: result.clone(),
402 }
403 });
404 (
405 ExecutionEventInner::Finished {
406 result,
407 http_client_traces,
408 },
409 child_finished,
410 version,
411 )
412 }
413 }
414 WorkerError::DbError(db_error) => {
415 return Err(db_error);
416 }
417 WorkerError::ActivityTrap {
418 reason: reason_inner,
419 trap_kind,
420 detail,
421 version,
422 http_client_traces,
423 } => {
424 if let Some(duration) = can_be_retried {
425 let expires_at = result_obtained_at + duration;
426 debug!(
427 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
428 );
429 (
430 ExecutionEventInner::TemporarilyFailed {
431 backoff_expires_at: expires_at,
432 reason_full: StrVariant::from(reason),
433 reason_inner: StrVariant::from(reason_inner),
434 detail: Some(detail),
435 http_client_traces,
436 },
437 None,
438 version,
439 )
440 } else {
441 info!(
442 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
443 );
444 let result = Err(FinishedExecutionError::PermanentFailure {
445 reason_inner,
446 reason_full: reason,
447 kind: PermanentFailureKind::ActivityTrap,
448 detail: Some(detail),
449 });
450 let child_finished =
451 parent.map(|(parent_execution_id, parent_join_set)| {
452 ChildFinishedResponse {
453 parent_execution_id,
454 parent_join_set,
455 result: result.clone(),
456 }
457 });
458 (
459 ExecutionEventInner::Finished {
460 result,
461 http_client_traces,
462 },
463 child_finished,
464 version,
465 )
466 }
467 }
468 WorkerError::ActivityReturnedError {
469 detail,
470 version,
471 http_client_traces,
472 } => {
473 let duration = can_be_retried.expect(
474 "ActivityReturnedError must not be returned when retries are exhausted",
475 );
476 let expires_at = result_obtained_at + duration;
477 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
478 (
479 ExecutionEventInner::TemporarilyFailed {
480 backoff_expires_at: expires_at,
481 reason_full: StrVariant::Static("activity returned error"),
482 reason_inner: StrVariant::Static("activity returned error"),
483 detail,
484 http_client_traces,
485 },
486 None,
487 version,
488 )
489 }
490 WorkerError::TemporaryWorkflowTrap {
491 reason: reason_inner,
492 kind,
493 detail,
494 version,
495 } => {
496 let duration = can_be_retried.expect("workflows are retried forever");
497 let expires_at = result_obtained_at + duration;
498 debug!(
499 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
500 );
501 (
502 ExecutionEventInner::TemporarilyFailed {
503 backoff_expires_at: expires_at,
504 reason_full: StrVariant::from(reason),
505 reason_inner: StrVariant::from(reason_inner),
506 detail,
507 http_client_traces: None,
508 },
509 None,
510 version,
511 )
512 }
513 WorkerError::LimitReached {
514 reason: inner_reason,
515 version: new_version,
516 } => {
517 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
518 warn!(
519 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
520 );
521 (
522 ExecutionEventInner::Unlocked {
523 backoff_expires_at: expires_at,
524 reason: StrVariant::from(reason),
525 },
526 None,
527 new_version,
528 )
529 }
530 WorkerError::FatalError(fatal_error, version) => {
531 info!("Fatal worker error - {fatal_error:?}");
532 let result = Err(FinishedExecutionError::from(fatal_error));
533 let child_finished =
534 parent.map(|(parent_execution_id, parent_join_set)| {
535 ChildFinishedResponse {
536 parent_execution_id,
537 parent_join_set,
538 result: result.clone(),
539 }
540 });
541 (
542 ExecutionEventInner::Finished {
543 result,
544 http_client_traces: None,
545 },
546 child_finished,
547 version,
548 )
549 }
550 };
551 Some(Append {
552 created_at: result_obtained_at,
553 primary_event: AppendRequest {
554 created_at: result_obtained_at,
555 event: primary_event,
556 },
557 execution_id,
558 version,
559 child_finished,
560 })
561 }
562 })
563 }
564}
565
566#[derive(Debug, Clone)]
567pub(crate) struct ChildFinishedResponse {
568 pub(crate) parent_execution_id: ExecutionId,
569 pub(crate) parent_join_set: JoinSetId,
570 pub(crate) result: FinishedExecutionResult,
571}
572
573#[derive(Debug, Clone)]
574pub(crate) struct Append {
575 pub(crate) created_at: DateTime<Utc>,
576 pub(crate) primary_event: AppendRequest,
577 pub(crate) execution_id: ExecutionId,
578 pub(crate) version: Version,
579 pub(crate) child_finished: Option<ChildFinishedResponse>,
580}
581
582impl Append {
583 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
584 if let Some(child_finished) = self.child_finished {
585 assert_matches!(
586 &self.primary_event,
587 AppendRequest {
588 event: ExecutionEventInner::Finished { .. },
589 ..
590 }
591 );
592 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
593 db_connection
594 .append_batch_respond_to_parent(
595 derived.clone(),
596 self.created_at,
597 vec![self.primary_event],
598 self.version.clone(),
599 child_finished.parent_execution_id,
600 JoinSetResponseEventOuter {
601 created_at: self.created_at,
602 event: JoinSetResponseEvent {
603 join_set_id: child_finished.parent_join_set,
604 event: JoinSetResponse::ChildExecutionFinished {
605 child_execution_id: derived,
606 finished_version: self.version,
608 result: child_finished.result,
609 },
610 },
611 },
612 )
613 .await?;
614 } else {
615 db_connection
616 .append_batch(
617 self.created_at,
618 vec![self.primary_event],
619 self.execution_id,
620 self.version,
621 )
622 .await?;
623 }
624 Ok(())
625 }
626}
627
628#[cfg(any(test, feature = "test"))]
629pub mod simple_worker {
630 use crate::worker::{Worker, WorkerContext, WorkerResult};
631 use async_trait::async_trait;
632 use concepts::{
633 storage::{HistoryEvent, Version},
634 FunctionFqn, FunctionMetadata, ParameterTypes,
635 };
636 use indexmap::IndexMap;
637 use std::sync::Arc;
638 use tracing::trace;
639
640 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
641 pub type SimpleWorkerResultMap =
642 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
643
644 #[derive(Clone, Debug)]
645 pub struct SimpleWorker {
646 pub worker_results_rev: SimpleWorkerResultMap,
647 pub ffqn: FunctionFqn,
648 exported: [FunctionMetadata; 1],
649 }
650
651 impl SimpleWorker {
652 #[must_use]
653 pub fn with_single_result(res: WorkerResult) -> Self {
654 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
655 Version::new(2),
656 (vec![], res),
657 )]))))
658 }
659
660 #[must_use]
661 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
662 Self {
663 worker_results_rev: self.worker_results_rev,
664 exported: [FunctionMetadata {
665 ffqn: ffqn.clone(),
666 parameter_types: ParameterTypes::default(),
667 return_type: None,
668 extension: None,
669 submittable: true,
670 }],
671 ffqn,
672 }
673 }
674
675 #[must_use]
676 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
677 Self {
678 worker_results_rev,
679 ffqn: FFQN_SOME,
680 exported: [FunctionMetadata {
681 ffqn: FFQN_SOME,
682 parameter_types: ParameterTypes::default(),
683 return_type: None,
684 extension: None,
685 submittable: true,
686 }],
687 }
688 }
689 }
690
691 #[async_trait]
692 impl Worker for SimpleWorker {
693 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
694 let (expected_version, (expected_eh, worker_result)) =
695 self.worker_results_rev.lock().unwrap().pop().unwrap();
696 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
697 assert_eq!(expected_version, ctx.version);
698 assert_eq!(expected_eh, ctx.event_history);
699 worker_result
700 }
701
702 fn exported_functions(&self) -> &[FunctionMetadata] {
703 &self.exported
704 }
705
706 fn imported_functions(&self) -> &[FunctionMetadata] {
707 &[]
708 }
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use self::simple_worker::SimpleWorker;
715 use super::*;
716 use crate::worker::FatalError;
717 use crate::{expired_timers_watcher, worker::WorkerResult};
718 use assert_matches::assert_matches;
719 use async_trait::async_trait;
720 use concepts::storage::{CreateRequest, JoinSetRequest};
721 use concepts::storage::{
722 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
723 };
724 use concepts::time::Now;
725 use concepts::{
726 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
727 SupportedFunctionReturnValue, TrapKind,
728 };
729 use db_tests::Database;
730 use indexmap::IndexMap;
731 use simple_worker::FFQN_SOME;
732 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
733 use test_utils::set_up;
734 use test_utils::sim_clock::{ConstClock, SimClock};
735
736 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
737
738 async fn tick_fn<
739 W: Worker + Debug,
740 C: ClockFn + 'static,
741 DB: DbConnection + 'static,
742 P: DbPool<DB> + 'static,
743 >(
744 config: ExecConfig,
745 clock_fn: C,
746 db_pool: P,
747 worker: Arc<W>,
748 executed_at: DateTime<Utc>,
749 ) -> ExecutionProgress {
750 trace!("Ticking with {worker:?}");
751 let ffqns = super::extract_ffqns(worker.as_ref());
752 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
753 let mut execution_progress = executor.tick(executed_at).await.unwrap();
754 loop {
755 execution_progress
756 .executions
757 .retain(|(_, abort_handle)| !abort_handle.is_finished());
758 if execution_progress.executions.is_empty() {
759 return execution_progress;
760 }
761 tokio::time::sleep(Duration::from_millis(10)).await;
762 }
763 }
764
765 #[tokio::test]
766 async fn execute_simple_lifecycle_tick_based_mem() {
767 let created_at = Now.now();
768 let (_guard, db_pool) = Database::Memory.set_up().await;
769 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
770 db_pool.close().await.unwrap();
771 }
772
773 #[cfg(not(madsim))]
774 #[tokio::test]
775 async fn execute_simple_lifecycle_tick_based_sqlite() {
776 let created_at = Now.now();
777 let (_guard, db_pool) = Database::Sqlite.set_up().await;
778 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
779 db_pool.close().await.unwrap();
780 }
781
782 async fn execute_simple_lifecycle_tick_based<
783 DB: DbConnection + 'static,
784 P: DbPool<DB> + 'static,
785 C: ClockFn + 'static,
786 >(
787 pool: P,
788 clock_fn: C,
789 ) {
790 set_up();
791 let created_at = clock_fn.now();
792 let exec_config = ExecConfig {
793 batch_size: 1,
794 lock_expiry: Duration::from_secs(1),
795 tick_sleep: Duration::from_millis(100),
796 component_id: ComponentId::dummy_activity(),
797 task_limiter: None,
798 };
799
800 let execution_log = create_and_tick(
801 CreateAndTickConfig {
802 execution_id: ExecutionId::generate(),
803 created_at,
804 max_retries: 0,
805 executed_at: created_at,
806 retry_exp_backoff: Duration::ZERO,
807 },
808 clock_fn,
809 pool,
810 exec_config,
811 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
812 SupportedFunctionReturnValue::None,
813 Version::new(2),
814 None,
815 ))),
816 tick_fn,
817 )
818 .await;
819 assert_matches!(
820 execution_log.events.get(2).unwrap(),
821 ExecutionEvent {
822 event: ExecutionEventInner::Finished {
823 result: Ok(SupportedFunctionReturnValue::None),
824 http_client_traces: None
825 },
826 created_at: _,
827 backtrace_id: None,
828 }
829 );
830 }
831
832 #[tokio::test]
833 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
834 set_up();
835 let created_at = Now.now();
836 let clock_fn = ConstClock(created_at);
837 let (_guard, db_pool) = Database::Memory.set_up().await;
838 let exec_config = ExecConfig {
839 batch_size: 1,
840 lock_expiry: Duration::from_secs(1),
841 tick_sleep: Duration::ZERO,
842 component_id: ComponentId::dummy_activity(),
843 task_limiter: None,
844 };
845
846 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
847 SupportedFunctionReturnValue::None,
848 Version::new(2),
849 None,
850 )));
851 let exec_task = ExecTask::spawn_new(
852 worker.clone(),
853 exec_config.clone(),
854 clock_fn,
855 db_pool.clone(),
856 ExecutorId::generate(),
857 );
858
859 let execution_log = create_and_tick(
860 CreateAndTickConfig {
861 execution_id: ExecutionId::generate(),
862 created_at,
863 max_retries: 0,
864 executed_at: created_at,
865 retry_exp_backoff: Duration::ZERO,
866 },
867 clock_fn,
868 db_pool.clone(),
869 exec_config,
870 worker,
871 |_, _, _, _, _| async {
872 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
874 },
875 )
876 .await;
877 exec_task.close().await;
878 db_pool.close().await.unwrap();
879 assert_matches!(
880 execution_log.events.get(2).unwrap(),
881 ExecutionEvent {
882 event: ExecutionEventInner::Finished {
883 result: Ok(SupportedFunctionReturnValue::None),
884 http_client_traces: None
885 },
886 created_at: _,
887 backtrace_id: None,
888 }
889 );
890 }
891
892 struct CreateAndTickConfig {
893 execution_id: ExecutionId,
894 created_at: DateTime<Utc>,
895 max_retries: u32,
896 executed_at: DateTime<Utc>,
897 retry_exp_backoff: Duration,
898 }
899
900 async fn create_and_tick<
901 W: Worker,
902 C: ClockFn,
903 DB: DbConnection,
904 P: DbPool<DB>,
905 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
906 F: Future<Output = ExecutionProgress>,
907 >(
908 config: CreateAndTickConfig,
909 clock_fn: C,
910 db_pool: P,
911 exec_config: ExecConfig,
912 worker: Arc<W>,
913 mut tick: T,
914 ) -> ExecutionLog {
915 let db_connection = db_pool.connection();
917 db_connection
918 .create(CreateRequest {
919 created_at: config.created_at,
920 execution_id: config.execution_id.clone(),
921 ffqn: FFQN_SOME,
922 params: Params::empty(),
923 parent: None,
924 metadata: concepts::ExecutionMetadata::empty(),
925 scheduled_at: config.created_at,
926 retry_exp_backoff: config.retry_exp_backoff,
927 max_retries: config.max_retries,
928 component_id: ComponentId::dummy_activity(),
929 scheduled_by: None,
930 })
931 .await
932 .unwrap();
933 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
935 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
936 debug!("Execution history after tick: {execution_log:?}");
937 assert_matches!(
939 execution_log.events.first().unwrap(),
940 ExecutionEvent {
941 event: ExecutionEventInner::Created { .. },
942 created_at: actually_created_at,
943 backtrace_id: None,
944 }
945 if config.created_at == *actually_created_at
946 );
947 let locked_at = assert_matches!(
948 execution_log.events.get(1).unwrap(),
949 ExecutionEvent {
950 event: ExecutionEventInner::Locked { .. },
951 created_at: locked_at,
952 backtrace_id: None,
953 } if config.created_at <= *locked_at
954 => *locked_at
955 );
956 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
957 event: _,
958 created_at: executed_at,
959 backtrace_id: None,
960 } if *executed_at >= locked_at);
961 execution_log
962 }
963
964 #[expect(clippy::too_many_lines)]
965 #[tokio::test]
966 async fn activity_trap_should_trigger_an_execution_retry() {
967 set_up();
968 let sim_clock = SimClock::default();
969 let (_guard, db_pool) = Database::Memory.set_up().await;
970 let exec_config = ExecConfig {
971 batch_size: 1,
972 lock_expiry: Duration::from_secs(1),
973 tick_sleep: Duration::ZERO,
974 component_id: ComponentId::dummy_activity(),
975 task_limiter: None,
976 };
977 let expected_reason = "error reason";
978 let expected_detail = "error detail";
979 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
980 WorkerError::ActivityTrap {
981 reason: expected_reason.to_string(),
982 trap_kind: concepts::TrapKind::Trap,
983 detail: expected_detail.to_string(),
984 version: Version::new(2),
985 http_client_traces: None,
986 },
987 )));
988 let retry_exp_backoff = Duration::from_millis(100);
989 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
990 let execution_log = create_and_tick(
991 CreateAndTickConfig {
992 execution_id: ExecutionId::generate(),
993 created_at: sim_clock.now(),
994 max_retries: 1,
995 executed_at: sim_clock.now(),
996 retry_exp_backoff,
997 },
998 sim_clock.clone(),
999 db_pool.clone(),
1000 exec_config.clone(),
1001 worker,
1002 tick_fn,
1003 )
1004 .await;
1005 assert_eq!(3, execution_log.events.len());
1006 {
1007 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1008 &execution_log.events.get(2).unwrap(),
1009 ExecutionEvent {
1010 event: ExecutionEventInner::TemporarilyFailed {
1011 reason_inner,
1012 reason_full,
1013 detail,
1014 backoff_expires_at,
1015 http_client_traces: None,
1016 },
1017 created_at: at,
1018 backtrace_id: None,
1019 }
1020 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1021 );
1022 assert_eq!(expected_reason, reason_inner.deref());
1023 assert_eq!(
1024 format!("activity trap: {expected_reason}"),
1025 reason_full.deref()
1026 );
1027 assert_eq!(Some(expected_detail), detail.as_deref());
1028 assert_eq!(at, sim_clock.now());
1029 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1030 }
1031 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1032 std::sync::Mutex::new(IndexMap::from([(
1033 Version::new(4),
1034 (
1035 vec![],
1036 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1037 ),
1038 )])),
1039 )));
1040 assert!(tick_fn(
1042 exec_config.clone(),
1043 sim_clock.clone(),
1044 db_pool.clone(),
1045 worker.clone(),
1046 sim_clock.now(),
1047 )
1048 .await
1049 .executions
1050 .is_empty());
1051 sim_clock.move_time_forward(retry_exp_backoff).await;
1053 tick_fn(
1054 exec_config,
1055 sim_clock.clone(),
1056 db_pool.clone(),
1057 worker,
1058 sim_clock.now(),
1059 )
1060 .await;
1061 let execution_log = {
1062 let db_connection = db_pool.connection();
1063 db_connection
1064 .get(&execution_log.execution_id)
1065 .await
1066 .unwrap()
1067 };
1068 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1069 assert_matches!(
1070 execution_log.events.get(3).unwrap(),
1071 ExecutionEvent {
1072 event: ExecutionEventInner::Locked { .. },
1073 created_at: at,
1074 backtrace_id: None,
1075 } if *at == sim_clock.now()
1076 );
1077 assert_matches!(
1078 execution_log.events.get(4).unwrap(),
1079 ExecutionEvent {
1080 event: ExecutionEventInner::Finished {
1081 result: Ok(SupportedFunctionReturnValue::None),
1082 http_client_traces: None
1083 },
1084 created_at: finished_at,
1085 backtrace_id: None,
1086 } if *finished_at == sim_clock.now()
1087 );
1088 db_pool.close().await.unwrap();
1089 }
1090
1091 #[tokio::test]
1092 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1093 set_up();
1094 let created_at = Now.now();
1095 let clock_fn = ConstClock(created_at);
1096 let (_guard, db_pool) = Database::Memory.set_up().await;
1097 let exec_config = ExecConfig {
1098 batch_size: 1,
1099 lock_expiry: Duration::from_secs(1),
1100 tick_sleep: Duration::ZERO,
1101 component_id: ComponentId::dummy_activity(),
1102 task_limiter: None,
1103 };
1104
1105 let expected_reason = "error reason";
1106 let expected_detail = "error detail";
1107 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1108 WorkerError::ActivityTrap {
1109 reason: expected_reason.to_string(),
1110 trap_kind: concepts::TrapKind::Trap,
1111 detail: expected_detail.to_string(),
1112 version: Version::new(2),
1113 http_client_traces: None,
1114 },
1115 )));
1116 let execution_log = create_and_tick(
1117 CreateAndTickConfig {
1118 execution_id: ExecutionId::generate(),
1119 created_at,
1120 max_retries: 0,
1121 executed_at: created_at,
1122 retry_exp_backoff: Duration::ZERO,
1123 },
1124 clock_fn,
1125 db_pool.clone(),
1126 exec_config.clone(),
1127 worker,
1128 tick_fn,
1129 )
1130 .await;
1131 assert_eq!(3, execution_log.events.len());
1132 let (reason, kind, detail) = assert_matches!(
1133 &execution_log.events.get(2).unwrap(),
1134 ExecutionEvent {
1135 event: ExecutionEventInner::Finished{
1136 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1137 http_client_traces: None
1138 },
1139 created_at: at,
1140 backtrace_id: None,
1141 } if *at == created_at
1142 => (reason_inner, kind, detail)
1143 );
1144 assert_eq!(expected_reason, *reason);
1145 assert_eq!(Some(expected_detail), detail.as_deref());
1146 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1147
1148 db_pool.close().await.unwrap();
1149 }
1150
1151 #[tokio::test]
1152 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1153 let worker_error = WorkerError::ActivityTrap {
1154 reason: "error reason".to_string(),
1155 trap_kind: TrapKind::Trap,
1156 detail: "detail".to_string(),
1157 version: Version::new(2),
1158 http_client_traces: None,
1159 };
1160 let expected_child_err = FinishedExecutionError::PermanentFailure {
1161 reason_full: "activity trap: error reason".to_string(),
1162 reason_inner: "error reason".to_string(),
1163 kind: PermanentFailureKind::ActivityTrap,
1164 detail: Some("detail".to_string()),
1165 };
1166 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1167 .await;
1168 }
1169
1170 #[tokio::test]
1173 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1174 let worker_error = WorkerError::TemporaryTimeoutHandledByWatcher;
1175 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1176 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1177 .await;
1178 }
1179
1180 #[tokio::test]
1181 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1182 let parent_id = ExecutionId::from_parts(1, 1);
1183 let join_set_id_outer =
1184 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1185 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1186 let join_set_id_inner =
1187 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1188 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1189 let worker_error = WorkerError::FatalError(
1190 FatalError::UnhandledChildExecutionError {
1191 child_execution_id: child_execution_id.clone(),
1192 root_cause_id: root_cause_id.clone(),
1193 },
1194 Version::new(2),
1195 );
1196 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1197 child_execution_id,
1198 root_cause_id,
1199 };
1200 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1201 .await;
1202 }
1203
1204 #[expect(clippy::too_many_lines)]
1205 async fn child_execution_permanently_failed_should_notify_parent(
1206 worker_error: WorkerError,
1207 expected_child_err: FinishedExecutionError,
1208 ) {
1209 use concepts::storage::JoinSetResponseEventOuter;
1210 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1211
1212 set_up();
1213 let sim_clock = SimClock::default();
1214 let (_guard, db_pool) = Database::Memory.set_up().await;
1215
1216 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1217 WorkerResult::DbUpdatedByWorker,
1218 ));
1219 let parent_execution_id = ExecutionId::generate();
1220 db_pool
1221 .connection()
1222 .create(CreateRequest {
1223 created_at: sim_clock.now(),
1224 execution_id: parent_execution_id.clone(),
1225 ffqn: FFQN_SOME,
1226 params: Params::empty(),
1227 parent: None,
1228 metadata: concepts::ExecutionMetadata::empty(),
1229 scheduled_at: sim_clock.now(),
1230 retry_exp_backoff: Duration::ZERO,
1231 max_retries: 0,
1232 component_id: ComponentId::dummy_activity(),
1233 scheduled_by: None,
1234 })
1235 .await
1236 .unwrap();
1237 tick_fn(
1238 ExecConfig {
1239 batch_size: 1,
1240 lock_expiry: LOCK_EXPIRY,
1241 tick_sleep: Duration::ZERO,
1242 component_id: ComponentId::dummy_activity(),
1243 task_limiter: None,
1244 },
1245 sim_clock.clone(),
1246 db_pool.clone(),
1247 parent_worker,
1248 sim_clock.now(),
1249 )
1250 .await;
1251
1252 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1253 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1254 {
1256 let child = CreateRequest {
1257 created_at: sim_clock.now(),
1258 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1259 ffqn: FFQN_CHILD,
1260 params: Params::empty(),
1261 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1262 metadata: concepts::ExecutionMetadata::empty(),
1263 scheduled_at: sim_clock.now(),
1264 retry_exp_backoff: Duration::ZERO,
1265 max_retries: 0,
1266 component_id: ComponentId::dummy_activity(),
1267 scheduled_by: None,
1268 };
1269 let current_time = sim_clock.now();
1270 let join_set = AppendRequest {
1271 created_at: current_time,
1272 event: ExecutionEventInner::HistoryEvent {
1273 event: HistoryEvent::JoinSetCreate {
1274 join_set_id: join_set_id.clone(),
1275 closing_strategy: ClosingStrategy::Complete,
1276 },
1277 },
1278 };
1279 let child_exec_req = AppendRequest {
1280 created_at: current_time,
1281 event: ExecutionEventInner::HistoryEvent {
1282 event: HistoryEvent::JoinSetRequest {
1283 join_set_id: join_set_id.clone(),
1284 request: JoinSetRequest::ChildExecutionRequest {
1285 child_execution_id: child_execution_id.clone(),
1286 },
1287 },
1288 },
1289 };
1290 let join_next = AppendRequest {
1291 created_at: current_time,
1292 event: ExecutionEventInner::HistoryEvent {
1293 event: HistoryEvent::JoinNext {
1294 join_set_id: join_set_id.clone(),
1295 run_expires_at: sim_clock.now(),
1296 closing: false,
1297 },
1298 },
1299 };
1300 db_pool
1301 .connection()
1302 .append_batch_create_new_execution(
1303 current_time,
1304 vec![join_set, child_exec_req, join_next],
1305 parent_execution_id.clone(),
1306 Version::new(2),
1307 vec![child],
1308 )
1309 .await
1310 .unwrap();
1311 }
1312
1313 let child_worker = Arc::new(
1314 SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1315 );
1316
1317 tick_fn(
1319 ExecConfig {
1320 batch_size: 1,
1321 lock_expiry: LOCK_EXPIRY,
1322 tick_sleep: Duration::ZERO,
1323 component_id: ComponentId::dummy_activity(),
1324 task_limiter: None,
1325 },
1326 sim_clock.clone(),
1327 db_pool.clone(),
1328 child_worker,
1329 sim_clock.now(),
1330 )
1331 .await;
1332 if matches!(
1333 expected_child_err,
1334 FinishedExecutionError::PermanentTimeout { .. }
1335 ) {
1336 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1338 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1339 .await
1340 .unwrap();
1341 }
1342 let child_log = db_pool
1343 .connection()
1344 .get(&ExecutionId::Derived(child_execution_id.clone()))
1345 .await
1346 .unwrap();
1347 assert!(child_log.pending_state.is_finished());
1348 assert_eq!(
1349 Version(2),
1350 child_log.next_version,
1351 "created = 0, locked = 1, with_single_result = 2"
1352 );
1353 assert_eq!(
1354 ExecutionEventInner::Finished {
1355 result: Err(expected_child_err),
1356 http_client_traces: None
1357 },
1358 child_log.last_event().event
1359 );
1360 let parent_log = db_pool
1361 .connection()
1362 .get(&parent_execution_id)
1363 .await
1364 .unwrap();
1365 assert_matches!(
1366 parent_log.pending_state,
1367 PendingState::PendingAt {
1368 scheduled_at
1369 } if scheduled_at == sim_clock.now(),
1370 "parent should be back to pending"
1371 );
1372 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1373 parent_log.responses.last(),
1374 Some(JoinSetResponseEventOuter{
1375 created_at: at,
1376 event: JoinSetResponseEvent{
1377 join_set_id: found_join_set_id,
1378 event: JoinSetResponse::ChildExecutionFinished {
1379 child_execution_id: found_child_execution_id,
1380 finished_version,
1381 result: found_result,
1382 }
1383 }
1384 })
1385 if *at == sim_clock.now()
1386 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1387 );
1388 assert_eq!(join_set_id, *found_join_set_id);
1389 assert_eq!(child_execution_id, *found_child_execution_id);
1390 assert_eq!(child_log.next_version, *child_finished_version);
1391 assert!(found_result.is_err());
1392
1393 db_pool.close().await.unwrap();
1394 }
1395
1396 #[derive(Clone, Debug)]
1397 struct SleepyWorker {
1398 duration: Duration,
1399 result: SupportedFunctionReturnValue,
1400 exported: [FunctionMetadata; 1],
1401 }
1402
1403 #[async_trait]
1404 impl Worker for SleepyWorker {
1405 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1406 tokio::time::sleep(self.duration).await;
1407 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1408 }
1409
1410 fn exported_functions(&self) -> &[FunctionMetadata] {
1411 &self.exported
1412 }
1413
1414 fn imported_functions(&self) -> &[FunctionMetadata] {
1415 &[]
1416 }
1417 }
1418
1419 #[expect(clippy::too_many_lines)]
1420 #[tokio::test]
1421 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1422 set_up();
1423 let sim_clock = SimClock::default();
1424 let (_guard, db_pool) = Database::Memory.set_up().await;
1425 let lock_expiry = Duration::from_millis(100);
1426 let exec_config = ExecConfig {
1427 batch_size: 1,
1428 lock_expiry,
1429 tick_sleep: Duration::ZERO,
1430 component_id: ComponentId::dummy_activity(),
1431 task_limiter: None,
1432 };
1433
1434 let worker = Arc::new(SleepyWorker {
1435 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1437 exported: [FunctionMetadata {
1438 ffqn: FFQN_SOME,
1439 parameter_types: ParameterTypes::default(),
1440 return_type: None,
1441 extension: None,
1442 submittable: true,
1443 }],
1444 });
1445 let execution_id = ExecutionId::generate();
1447 let timeout_duration = Duration::from_millis(300);
1448 let db_connection = db_pool.connection();
1449 db_connection
1450 .create(CreateRequest {
1451 created_at: sim_clock.now(),
1452 execution_id: execution_id.clone(),
1453 ffqn: FFQN_SOME,
1454 params: Params::empty(),
1455 parent: None,
1456 metadata: concepts::ExecutionMetadata::empty(),
1457 scheduled_at: sim_clock.now(),
1458 retry_exp_backoff: timeout_duration,
1459 max_retries: 1,
1460 component_id: ComponentId::dummy_activity(),
1461 scheduled_by: None,
1462 })
1463 .await
1464 .unwrap();
1465
1466 let ffqns = super::extract_ffqns(worker.as_ref());
1467 let executor = ExecTask::new(
1468 worker,
1469 exec_config,
1470 sim_clock.clone(),
1471 db_pool.clone(),
1472 ffqns,
1473 );
1474 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1475 assert_eq!(1, first_execution_progress.executions.len());
1476 sim_clock.move_time_forward(lock_expiry).await;
1478 let now_after_first_lock_expiry = sim_clock.now();
1480 {
1481 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1482 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1483 assert!(cleanup_progress.executions.is_empty());
1484 }
1485 {
1486 let expired_locks =
1487 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1488 .await
1489 .unwrap()
1490 .expired_locks;
1491 assert_eq!(1, expired_locks);
1492 }
1493 assert!(!first_execution_progress
1494 .executions
1495 .pop()
1496 .unwrap()
1497 .1
1498 .is_finished());
1499
1500 let execution_log = db_connection.get(&execution_id).await.unwrap();
1501 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1502 assert_matches!(
1503 &execution_log.events.get(2).unwrap(),
1504 ExecutionEvent {
1505 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1506 created_at: at,
1507 backtrace_id: None,
1508 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1509 );
1510 assert_eq!(
1511 PendingState::PendingAt {
1512 scheduled_at: expected_first_timeout_expiry
1513 },
1514 execution_log.pending_state
1515 );
1516 sim_clock.move_time_forward(timeout_duration).await;
1517 let now_after_first_timeout = sim_clock.now();
1518 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1519
1520 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1521 assert_eq!(1, second_execution_progress.executions.len());
1522
1523 sim_clock.move_time_forward(lock_expiry).await;
1525 let now_after_second_lock_expiry = sim_clock.now();
1527 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1528 {
1529 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1530 assert!(cleanup_progress.executions.is_empty());
1531 }
1532 {
1533 let expired_locks =
1534 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1535 .await
1536 .unwrap()
1537 .expired_locks;
1538 assert_eq!(1, expired_locks);
1539 }
1540 assert!(!second_execution_progress
1541 .executions
1542 .pop()
1543 .unwrap()
1544 .1
1545 .is_finished());
1546
1547 drop(db_connection);
1548 drop(executor);
1549 db_pool.close().await.unwrap();
1550 }
1551}