1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12 FinishedExecutionError,
13 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_ffqns(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 "otel.name" = format!("worker {}", locked_execution.ffqn),
240 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241 locked_execution.metadata.enrich(&worker_span);
242 tokio::spawn({
243 let worker_span2 = worker_span.clone();
244 async move {
245 let res = Self::run_worker(
246 worker,
247 &db_pool,
248 execution_deadline,
249 clock_fn,
250 locked_execution,
251 worker_span2,
252 )
253 .await;
254 if let Err(db_error) = res {
255 error!("Execution will be timed out not writing `{db_error:?}`");
256 }
257 drop(permit);
258 }
259 .instrument(worker_span)
260 })
261 };
262 executions.push((execution_id, join_handle));
263 }
264 Ok(ExecutionProgress { executions })
265 }
266
267 async fn run_worker(
268 worker: Arc<dyn Worker>,
269 db_pool: &P,
270 execution_deadline: DateTime<Utc>,
271 clock_fn: C,
272 locked_execution: LockedExecution,
273 worker_span: Span,
274 ) -> Result<(), DbError> {
275 debug!("Worker::run starting");
276 trace!(
277 version = %locked_execution.version,
278 params = ?locked_execution.params,
279 event_history = ?locked_execution.event_history,
280 "Worker::run starting"
281 );
282 let can_be_retried = ExecutionLog::can_be_retried_after(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.max_retries,
285 locked_execution.retry_exp_backoff,
286 );
287 let unlock_expiry_on_limit_reached =
288 ExecutionLog::compute_retry_duration_when_retrying_forever(
289 locked_execution.temporary_event_count + 1,
290 locked_execution.retry_exp_backoff,
291 );
292 let ctx = WorkerContext {
293 execution_id: locked_execution.execution_id.clone(),
294 metadata: locked_execution.metadata,
295 ffqn: locked_execution.ffqn,
296 params: locked_execution.params,
297 event_history: locked_execution.event_history,
298 responses: locked_execution
299 .responses
300 .into_iter()
301 .map(|outer| outer.event)
302 .collect(),
303 version: locked_execution.version,
304 execution_deadline,
305 can_be_retried: can_be_retried.is_some(),
306 run_id: locked_execution.run_id,
307 worker_span,
308 };
309 let worker_result = worker.run(ctx).await;
310 trace!(?worker_result, "Worker::run finished");
311 let result_obtained_at = clock_fn.now();
312 match Self::worker_result_to_execution_event(
313 locked_execution.execution_id,
314 worker_result,
315 result_obtained_at,
316 locked_execution.parent,
317 can_be_retried,
318 unlock_expiry_on_limit_reached,
319 )? {
320 Some(append) => {
321 let db_connection = db_pool.connection();
322 trace!("Appending {append:?}");
323 append.clone().append(&db_connection).await
324 }
325 None => Ok(()),
326 }
327 }
328
329 #[expect(clippy::too_many_lines)]
332 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version, http_client_traces) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished {
357 result: Ok(result),
358 http_client_traces,
359 },
360 };
361
362 Some(Append {
363 created_at: result_obtained_at,
364 primary_event,
365 execution_id,
366 version: new_version,
367 child_finished,
368 })
369 }
370 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371 WorkerResult::Err(err) => {
372 let reason = err.to_string();
373 let (primary_event, child_finished, version) = match err {
374 WorkerError::TemporaryTimeout {
375 http_client_traces,
376 version,
377 } => {
378 if let Some(duration) = can_be_retried {
379 let backoff_expires_at = result_obtained_at + duration;
380 info!(
381 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
382 );
383 (
384 ExecutionEventInner::TemporarilyTimedOut {
385 backoff_expires_at,
386 http_client_traces,
387 },
388 None,
389 version,
390 )
391 } else {
392 info!("Permanent timeout");
393 let result = Err(FinishedExecutionError::PermanentTimeout);
394 let child_finished =
395 parent.map(|(parent_execution_id, parent_join_set)| {
396 ChildFinishedResponse {
397 parent_execution_id,
398 parent_join_set,
399 result: result.clone(),
400 }
401 });
402 (
403 ExecutionEventInner::Finished {
404 result,
405 http_client_traces,
406 },
407 child_finished,
408 version,
409 )
410 }
411 }
412 WorkerError::DbError(db_error) => {
413 return Err(db_error);
414 }
415 WorkerError::ActivityTrap {
416 reason: reason_inner,
417 trap_kind,
418 detail,
419 version,
420 http_client_traces,
421 } => {
422 if let Some(duration) = can_be_retried {
423 let expires_at = result_obtained_at + duration;
424 debug!(
425 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
426 );
427 (
428 ExecutionEventInner::TemporarilyFailed {
429 backoff_expires_at: expires_at,
430 reason_full: StrVariant::from(reason),
431 reason_inner: StrVariant::from(reason_inner),
432 detail: Some(detail),
433 http_client_traces,
434 },
435 None,
436 version,
437 )
438 } else {
439 info!(
440 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
441 );
442 let result = Err(FinishedExecutionError::PermanentFailure {
443 reason_inner,
444 reason_full: reason,
445 kind: PermanentFailureKind::ActivityTrap,
446 detail: Some(detail),
447 });
448 let child_finished =
449 parent.map(|(parent_execution_id, parent_join_set)| {
450 ChildFinishedResponse {
451 parent_execution_id,
452 parent_join_set,
453 result: result.clone(),
454 }
455 });
456 (
457 ExecutionEventInner::Finished {
458 result,
459 http_client_traces,
460 },
461 child_finished,
462 version,
463 )
464 }
465 }
466 WorkerError::ActivityReturnedError {
467 detail,
468 version,
469 http_client_traces,
470 } => {
471 let duration = can_be_retried.expect(
472 "ActivityReturnedError must not be returned when retries are exhausted",
473 );
474 let expires_at = result_obtained_at + duration;
475 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
476 (
477 ExecutionEventInner::TemporarilyFailed {
478 backoff_expires_at: expires_at,
479 reason_full: StrVariant::Static("activity returned error"),
480 reason_inner: StrVariant::Static("activity returned error"),
481 detail,
482 http_client_traces,
483 },
484 None,
485 version,
486 )
487 }
488 WorkerError::TemporaryWorkflowTrap {
489 reason: reason_inner,
490 kind,
491 detail,
492 version,
493 } => {
494 let duration = can_be_retried.expect("workflows are retried forever");
495 let expires_at = result_obtained_at + duration;
496 debug!(
497 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
498 );
499 (
500 ExecutionEventInner::TemporarilyFailed {
501 backoff_expires_at: expires_at,
502 reason_full: StrVariant::from(reason),
503 reason_inner: StrVariant::from(reason_inner),
504 detail,
505 http_client_traces: None,
506 },
507 None,
508 version,
509 )
510 }
511 WorkerError::LimitReached {
512 reason: inner_reason,
513 version: new_version,
514 } => {
515 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
516 warn!(
517 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
518 );
519 (
520 ExecutionEventInner::Unlocked {
521 backoff_expires_at: expires_at,
522 reason: StrVariant::from(reason),
523 },
524 None,
525 new_version,
526 )
527 }
528 WorkerError::FatalError(fatal_error, version) => {
529 info!("Fatal worker error - {fatal_error:?}");
530 let result = Err(FinishedExecutionError::from(fatal_error));
531 let child_finished =
532 parent.map(|(parent_execution_id, parent_join_set)| {
533 ChildFinishedResponse {
534 parent_execution_id,
535 parent_join_set,
536 result: result.clone(),
537 }
538 });
539 (
540 ExecutionEventInner::Finished {
541 result,
542 http_client_traces: None,
543 },
544 child_finished,
545 version,
546 )
547 }
548 };
549 Some(Append {
550 created_at: result_obtained_at,
551 primary_event: AppendRequest {
552 created_at: result_obtained_at,
553 event: primary_event,
554 },
555 execution_id,
556 version,
557 child_finished,
558 })
559 }
560 })
561 }
562}
563
564#[derive(Debug, Clone)]
565pub(crate) struct ChildFinishedResponse {
566 pub(crate) parent_execution_id: ExecutionId,
567 pub(crate) parent_join_set: JoinSetId,
568 pub(crate) result: FinishedExecutionResult,
569}
570
571#[derive(Debug, Clone)]
572pub(crate) struct Append {
573 pub(crate) created_at: DateTime<Utc>,
574 pub(crate) primary_event: AppendRequest,
575 pub(crate) execution_id: ExecutionId,
576 pub(crate) version: Version,
577 pub(crate) child_finished: Option<ChildFinishedResponse>,
578}
579
580impl Append {
581 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
582 if let Some(child_finished) = self.child_finished {
583 assert_matches!(
584 &self.primary_event,
585 AppendRequest {
586 event: ExecutionEventInner::Finished { .. },
587 ..
588 }
589 );
590 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
591 db_connection
592 .append_batch_respond_to_parent(
593 derived.clone(),
594 self.created_at,
595 vec![self.primary_event],
596 self.version.clone(),
597 child_finished.parent_execution_id,
598 JoinSetResponseEventOuter {
599 created_at: self.created_at,
600 event: JoinSetResponseEvent {
601 join_set_id: child_finished.parent_join_set,
602 event: JoinSetResponse::ChildExecutionFinished {
603 child_execution_id: derived,
604 finished_version: self.version,
606 result: child_finished.result,
607 },
608 },
609 },
610 )
611 .await?;
612 } else {
613 db_connection
614 .append_batch(
615 self.created_at,
616 vec![self.primary_event],
617 self.execution_id,
618 self.version,
619 )
620 .await?;
621 }
622 Ok(())
623 }
624}
625
626#[cfg(any(test, feature = "test"))]
627pub mod simple_worker {
628 use crate::worker::{Worker, WorkerContext, WorkerResult};
629 use async_trait::async_trait;
630 use concepts::{
631 FunctionFqn, FunctionMetadata, ParameterTypes,
632 storage::{HistoryEvent, Version},
633 };
634 use indexmap::IndexMap;
635 use std::sync::Arc;
636 use tracing::trace;
637
638 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
639 pub type SimpleWorkerResultMap =
640 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
641
642 #[derive(Clone, Debug)]
643 pub struct SimpleWorker {
644 pub worker_results_rev: SimpleWorkerResultMap,
645 pub ffqn: FunctionFqn,
646 exported: [FunctionMetadata; 1],
647 }
648
649 impl SimpleWorker {
650 #[must_use]
651 pub fn with_single_result(res: WorkerResult) -> Self {
652 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
653 Version::new(2),
654 (vec![], res),
655 )]))))
656 }
657
658 #[must_use]
659 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
660 Self {
661 worker_results_rev: self.worker_results_rev,
662 exported: [FunctionMetadata {
663 ffqn: ffqn.clone(),
664 parameter_types: ParameterTypes::default(),
665 return_type: None,
666 extension: None,
667 submittable: true,
668 }],
669 ffqn,
670 }
671 }
672
673 #[must_use]
674 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
675 Self {
676 worker_results_rev,
677 ffqn: FFQN_SOME,
678 exported: [FunctionMetadata {
679 ffqn: FFQN_SOME,
680 parameter_types: ParameterTypes::default(),
681 return_type: None,
682 extension: None,
683 submittable: true,
684 }],
685 }
686 }
687 }
688
689 #[async_trait]
690 impl Worker for SimpleWorker {
691 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
692 let (expected_version, (expected_eh, worker_result)) =
693 self.worker_results_rev.lock().unwrap().pop().unwrap();
694 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
695 assert_eq!(expected_version, ctx.version);
696 assert_eq!(expected_eh, ctx.event_history);
697 worker_result
698 }
699
700 fn exported_functions(&self) -> &[FunctionMetadata] {
701 &self.exported
702 }
703
704 fn imported_functions(&self) -> &[FunctionMetadata] {
705 &[]
706 }
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use self::simple_worker::SimpleWorker;
713 use super::*;
714 use crate::worker::FatalError;
715 use crate::{expired_timers_watcher, worker::WorkerResult};
716 use assert_matches::assert_matches;
717 use async_trait::async_trait;
718 use concepts::storage::{CreateRequest, JoinSetRequest};
719 use concepts::storage::{
720 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
721 };
722 use concepts::time::Now;
723 use concepts::{
724 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
725 SupportedFunctionReturnValue, TrapKind,
726 };
727 use db_tests::Database;
728 use indexmap::IndexMap;
729 use simple_worker::FFQN_SOME;
730 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
731 use test_utils::set_up;
732 use test_utils::sim_clock::{ConstClock, SimClock};
733
734 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
735
736 async fn tick_fn<
737 W: Worker + Debug,
738 C: ClockFn + 'static,
739 DB: DbConnection + 'static,
740 P: DbPool<DB> + 'static,
741 >(
742 config: ExecConfig,
743 clock_fn: C,
744 db_pool: P,
745 worker: Arc<W>,
746 executed_at: DateTime<Utc>,
747 ) -> ExecutionProgress {
748 trace!("Ticking with {worker:?}");
749 let ffqns = super::extract_ffqns(worker.as_ref());
750 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
751 let mut execution_progress = executor.tick(executed_at).await.unwrap();
752 loop {
753 execution_progress
754 .executions
755 .retain(|(_, abort_handle)| !abort_handle.is_finished());
756 if execution_progress.executions.is_empty() {
757 return execution_progress;
758 }
759 tokio::time::sleep(Duration::from_millis(10)).await;
760 }
761 }
762
763 #[tokio::test]
764 async fn execute_simple_lifecycle_tick_based_mem() {
765 let created_at = Now.now();
766 let (_guard, db_pool) = Database::Memory.set_up().await;
767 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
768 db_pool.close().await.unwrap();
769 }
770
771 #[cfg(not(madsim))]
772 #[tokio::test]
773 async fn execute_simple_lifecycle_tick_based_sqlite() {
774 let created_at = Now.now();
775 let (_guard, db_pool) = Database::Sqlite.set_up().await;
776 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
777 db_pool.close().await.unwrap();
778 }
779
780 async fn execute_simple_lifecycle_tick_based<
781 DB: DbConnection + 'static,
782 P: DbPool<DB> + 'static,
783 C: ClockFn + 'static,
784 >(
785 pool: P,
786 clock_fn: C,
787 ) {
788 set_up();
789 let created_at = clock_fn.now();
790 let exec_config = ExecConfig {
791 batch_size: 1,
792 lock_expiry: Duration::from_secs(1),
793 tick_sleep: Duration::from_millis(100),
794 component_id: ComponentId::dummy_activity(),
795 task_limiter: None,
796 };
797
798 let execution_log = create_and_tick(
799 CreateAndTickConfig {
800 execution_id: ExecutionId::generate(),
801 created_at,
802 max_retries: 0,
803 executed_at: created_at,
804 retry_exp_backoff: Duration::ZERO,
805 },
806 clock_fn,
807 pool,
808 exec_config,
809 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
810 SupportedFunctionReturnValue::None,
811 Version::new(2),
812 None,
813 ))),
814 tick_fn,
815 )
816 .await;
817 assert_matches!(
818 execution_log.events.get(2).unwrap(),
819 ExecutionEvent {
820 event: ExecutionEventInner::Finished {
821 result: Ok(SupportedFunctionReturnValue::None),
822 http_client_traces: None
823 },
824 created_at: _,
825 backtrace_id: None,
826 }
827 );
828 }
829
830 #[tokio::test]
831 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
832 set_up();
833 let created_at = Now.now();
834 let clock_fn = ConstClock(created_at);
835 let (_guard, db_pool) = Database::Memory.set_up().await;
836 let exec_config = ExecConfig {
837 batch_size: 1,
838 lock_expiry: Duration::from_secs(1),
839 tick_sleep: Duration::ZERO,
840 component_id: ComponentId::dummy_activity(),
841 task_limiter: None,
842 };
843
844 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
845 SupportedFunctionReturnValue::None,
846 Version::new(2),
847 None,
848 )));
849 let exec_task = ExecTask::spawn_new(
850 worker.clone(),
851 exec_config.clone(),
852 clock_fn,
853 db_pool.clone(),
854 ExecutorId::generate(),
855 );
856
857 let execution_log = create_and_tick(
858 CreateAndTickConfig {
859 execution_id: ExecutionId::generate(),
860 created_at,
861 max_retries: 0,
862 executed_at: created_at,
863 retry_exp_backoff: Duration::ZERO,
864 },
865 clock_fn,
866 db_pool.clone(),
867 exec_config,
868 worker,
869 |_, _, _, _, _| async {
870 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
872 },
873 )
874 .await;
875 exec_task.close().await;
876 db_pool.close().await.unwrap();
877 assert_matches!(
878 execution_log.events.get(2).unwrap(),
879 ExecutionEvent {
880 event: ExecutionEventInner::Finished {
881 result: Ok(SupportedFunctionReturnValue::None),
882 http_client_traces: None
883 },
884 created_at: _,
885 backtrace_id: None,
886 }
887 );
888 }
889
890 struct CreateAndTickConfig {
891 execution_id: ExecutionId,
892 created_at: DateTime<Utc>,
893 max_retries: u32,
894 executed_at: DateTime<Utc>,
895 retry_exp_backoff: Duration,
896 }
897
898 async fn create_and_tick<
899 W: Worker,
900 C: ClockFn,
901 DB: DbConnection,
902 P: DbPool<DB>,
903 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
904 F: Future<Output = ExecutionProgress>,
905 >(
906 config: CreateAndTickConfig,
907 clock_fn: C,
908 db_pool: P,
909 exec_config: ExecConfig,
910 worker: Arc<W>,
911 mut tick: T,
912 ) -> ExecutionLog {
913 let db_connection = db_pool.connection();
915 db_connection
916 .create(CreateRequest {
917 created_at: config.created_at,
918 execution_id: config.execution_id.clone(),
919 ffqn: FFQN_SOME,
920 params: Params::empty(),
921 parent: None,
922 metadata: concepts::ExecutionMetadata::empty(),
923 scheduled_at: config.created_at,
924 retry_exp_backoff: config.retry_exp_backoff,
925 max_retries: config.max_retries,
926 component_id: ComponentId::dummy_activity(),
927 scheduled_by: None,
928 })
929 .await
930 .unwrap();
931 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
933 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
934 debug!("Execution history after tick: {execution_log:?}");
935 assert_matches!(
937 execution_log.events.first().unwrap(),
938 ExecutionEvent {
939 event: ExecutionEventInner::Created { .. },
940 created_at: actually_created_at,
941 backtrace_id: None,
942 }
943 if config.created_at == *actually_created_at
944 );
945 let locked_at = assert_matches!(
946 execution_log.events.get(1).unwrap(),
947 ExecutionEvent {
948 event: ExecutionEventInner::Locked { .. },
949 created_at: locked_at,
950 backtrace_id: None,
951 } if config.created_at <= *locked_at
952 => *locked_at
953 );
954 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
955 event: _,
956 created_at: executed_at,
957 backtrace_id: None,
958 } if *executed_at >= locked_at);
959 execution_log
960 }
961
962 #[expect(clippy::too_many_lines)]
963 #[tokio::test]
964 async fn activity_trap_should_trigger_an_execution_retry() {
965 set_up();
966 let sim_clock = SimClock::default();
967 let (_guard, db_pool) = Database::Memory.set_up().await;
968 let exec_config = ExecConfig {
969 batch_size: 1,
970 lock_expiry: Duration::from_secs(1),
971 tick_sleep: Duration::ZERO,
972 component_id: ComponentId::dummy_activity(),
973 task_limiter: None,
974 };
975 let expected_reason = "error reason";
976 let expected_detail = "error detail";
977 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
978 WorkerError::ActivityTrap {
979 reason: expected_reason.to_string(),
980 trap_kind: concepts::TrapKind::Trap,
981 detail: expected_detail.to_string(),
982 version: Version::new(2),
983 http_client_traces: None,
984 },
985 )));
986 let retry_exp_backoff = Duration::from_millis(100);
987 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
988 let execution_log = create_and_tick(
989 CreateAndTickConfig {
990 execution_id: ExecutionId::generate(),
991 created_at: sim_clock.now(),
992 max_retries: 1,
993 executed_at: sim_clock.now(),
994 retry_exp_backoff,
995 },
996 sim_clock.clone(),
997 db_pool.clone(),
998 exec_config.clone(),
999 worker,
1000 tick_fn,
1001 )
1002 .await;
1003 assert_eq!(3, execution_log.events.len());
1004 {
1005 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1006 &execution_log.events.get(2).unwrap(),
1007 ExecutionEvent {
1008 event: ExecutionEventInner::TemporarilyFailed {
1009 reason_inner,
1010 reason_full,
1011 detail,
1012 backoff_expires_at,
1013 http_client_traces: None,
1014 },
1015 created_at: at,
1016 backtrace_id: None,
1017 }
1018 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1019 );
1020 assert_eq!(expected_reason, reason_inner.deref());
1021 assert_eq!(
1022 format!("activity trap: {expected_reason}"),
1023 reason_full.deref()
1024 );
1025 assert_eq!(Some(expected_detail), detail.as_deref());
1026 assert_eq!(at, sim_clock.now());
1027 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1028 }
1029 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1030 std::sync::Mutex::new(IndexMap::from([(
1031 Version::new(4),
1032 (
1033 vec![],
1034 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1035 ),
1036 )])),
1037 )));
1038 assert!(
1040 tick_fn(
1041 exec_config.clone(),
1042 sim_clock.clone(),
1043 db_pool.clone(),
1044 worker.clone(),
1045 sim_clock.now(),
1046 )
1047 .await
1048 .executions
1049 .is_empty()
1050 );
1051 sim_clock.move_time_forward(retry_exp_backoff).await;
1053 tick_fn(
1054 exec_config,
1055 sim_clock.clone(),
1056 db_pool.clone(),
1057 worker,
1058 sim_clock.now(),
1059 )
1060 .await;
1061 let execution_log = {
1062 let db_connection = db_pool.connection();
1063 db_connection
1064 .get(&execution_log.execution_id)
1065 .await
1066 .unwrap()
1067 };
1068 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1069 assert_matches!(
1070 execution_log.events.get(3).unwrap(),
1071 ExecutionEvent {
1072 event: ExecutionEventInner::Locked { .. },
1073 created_at: at,
1074 backtrace_id: None,
1075 } if *at == sim_clock.now()
1076 );
1077 assert_matches!(
1078 execution_log.events.get(4).unwrap(),
1079 ExecutionEvent {
1080 event: ExecutionEventInner::Finished {
1081 result: Ok(SupportedFunctionReturnValue::None),
1082 http_client_traces: None
1083 },
1084 created_at: finished_at,
1085 backtrace_id: None,
1086 } if *finished_at == sim_clock.now()
1087 );
1088 db_pool.close().await.unwrap();
1089 }
1090
1091 #[tokio::test]
1092 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1093 set_up();
1094 let created_at = Now.now();
1095 let clock_fn = ConstClock(created_at);
1096 let (_guard, db_pool) = Database::Memory.set_up().await;
1097 let exec_config = ExecConfig {
1098 batch_size: 1,
1099 lock_expiry: Duration::from_secs(1),
1100 tick_sleep: Duration::ZERO,
1101 component_id: ComponentId::dummy_activity(),
1102 task_limiter: None,
1103 };
1104
1105 let expected_reason = "error reason";
1106 let expected_detail = "error detail";
1107 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1108 WorkerError::ActivityTrap {
1109 reason: expected_reason.to_string(),
1110 trap_kind: concepts::TrapKind::Trap,
1111 detail: expected_detail.to_string(),
1112 version: Version::new(2),
1113 http_client_traces: None,
1114 },
1115 )));
1116 let execution_log = create_and_tick(
1117 CreateAndTickConfig {
1118 execution_id: ExecutionId::generate(),
1119 created_at,
1120 max_retries: 0,
1121 executed_at: created_at,
1122 retry_exp_backoff: Duration::ZERO,
1123 },
1124 clock_fn,
1125 db_pool.clone(),
1126 exec_config.clone(),
1127 worker,
1128 tick_fn,
1129 )
1130 .await;
1131 assert_eq!(3, execution_log.events.len());
1132 let (reason, kind, detail) = assert_matches!(
1133 &execution_log.events.get(2).unwrap(),
1134 ExecutionEvent {
1135 event: ExecutionEventInner::Finished{
1136 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1137 http_client_traces: None
1138 },
1139 created_at: at,
1140 backtrace_id: None,
1141 } if *at == created_at
1142 => (reason_inner, kind, detail)
1143 );
1144 assert_eq!(expected_reason, *reason);
1145 assert_eq!(Some(expected_detail), detail.as_deref());
1146 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1147
1148 db_pool.close().await.unwrap();
1149 }
1150
1151 #[tokio::test]
1152 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1153 let worker_error = WorkerError::ActivityTrap {
1154 reason: "error reason".to_string(),
1155 trap_kind: TrapKind::Trap,
1156 detail: "detail".to_string(),
1157 version: Version::new(2),
1158 http_client_traces: None,
1159 };
1160 let expected_child_err = FinishedExecutionError::PermanentFailure {
1161 reason_full: "activity trap: error reason".to_string(),
1162 reason_inner: "error reason".to_string(),
1163 kind: PermanentFailureKind::ActivityTrap,
1164 detail: Some("detail".to_string()),
1165 };
1166 child_execution_permanently_failed_should_notify_parent(
1167 WorkerResult::Err(worker_error),
1168 expected_child_err,
1169 )
1170 .await;
1171 }
1172
1173 #[tokio::test]
1176 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1177 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1178 child_execution_permanently_failed_should_notify_parent(
1179 WorkerResult::DbUpdatedByWorkerOrWatcher,
1180 expected_child_err,
1181 )
1182 .await;
1183 }
1184
1185 #[tokio::test]
1186 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1187 let parent_id = ExecutionId::from_parts(1, 1);
1188 let join_set_id_outer =
1189 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1190 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1191 let join_set_id_inner =
1192 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1193 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1194 let worker_error = WorkerError::FatalError(
1195 FatalError::UnhandledChildExecutionError {
1196 child_execution_id: child_execution_id.clone(),
1197 root_cause_id: root_cause_id.clone(),
1198 },
1199 Version::new(2),
1200 );
1201 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1202 child_execution_id,
1203 root_cause_id,
1204 };
1205 child_execution_permanently_failed_should_notify_parent(
1206 WorkerResult::Err(worker_error),
1207 expected_child_err,
1208 )
1209 .await;
1210 }
1211
1212 #[expect(clippy::too_many_lines)]
1213 async fn child_execution_permanently_failed_should_notify_parent(
1214 worker_result: WorkerResult,
1215 expected_child_err: FinishedExecutionError,
1216 ) {
1217 use concepts::storage::JoinSetResponseEventOuter;
1218 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1219
1220 set_up();
1221 let sim_clock = SimClock::default();
1222 let (_guard, db_pool) = Database::Memory.set_up().await;
1223
1224 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1225 WorkerResult::DbUpdatedByWorkerOrWatcher,
1226 ));
1227 let parent_execution_id = ExecutionId::generate();
1228 db_pool
1229 .connection()
1230 .create(CreateRequest {
1231 created_at: sim_clock.now(),
1232 execution_id: parent_execution_id.clone(),
1233 ffqn: FFQN_SOME,
1234 params: Params::empty(),
1235 parent: None,
1236 metadata: concepts::ExecutionMetadata::empty(),
1237 scheduled_at: sim_clock.now(),
1238 retry_exp_backoff: Duration::ZERO,
1239 max_retries: 0,
1240 component_id: ComponentId::dummy_activity(),
1241 scheduled_by: None,
1242 })
1243 .await
1244 .unwrap();
1245 tick_fn(
1246 ExecConfig {
1247 batch_size: 1,
1248 lock_expiry: LOCK_EXPIRY,
1249 tick_sleep: Duration::ZERO,
1250 component_id: ComponentId::dummy_activity(),
1251 task_limiter: None,
1252 },
1253 sim_clock.clone(),
1254 db_pool.clone(),
1255 parent_worker,
1256 sim_clock.now(),
1257 )
1258 .await;
1259
1260 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1261 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1262 {
1264 let child = CreateRequest {
1265 created_at: sim_clock.now(),
1266 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1267 ffqn: FFQN_CHILD,
1268 params: Params::empty(),
1269 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1270 metadata: concepts::ExecutionMetadata::empty(),
1271 scheduled_at: sim_clock.now(),
1272 retry_exp_backoff: Duration::ZERO,
1273 max_retries: 0,
1274 component_id: ComponentId::dummy_activity(),
1275 scheduled_by: None,
1276 };
1277 let current_time = sim_clock.now();
1278 let join_set = AppendRequest {
1279 created_at: current_time,
1280 event: ExecutionEventInner::HistoryEvent {
1281 event: HistoryEvent::JoinSetCreate {
1282 join_set_id: join_set_id.clone(),
1283 closing_strategy: ClosingStrategy::Complete,
1284 },
1285 },
1286 };
1287 let child_exec_req = AppendRequest {
1288 created_at: current_time,
1289 event: ExecutionEventInner::HistoryEvent {
1290 event: HistoryEvent::JoinSetRequest {
1291 join_set_id: join_set_id.clone(),
1292 request: JoinSetRequest::ChildExecutionRequest {
1293 child_execution_id: child_execution_id.clone(),
1294 },
1295 },
1296 },
1297 };
1298 let join_next = AppendRequest {
1299 created_at: current_time,
1300 event: ExecutionEventInner::HistoryEvent {
1301 event: HistoryEvent::JoinNext {
1302 join_set_id: join_set_id.clone(),
1303 run_expires_at: sim_clock.now(),
1304 closing: false,
1305 },
1306 },
1307 };
1308 db_pool
1309 .connection()
1310 .append_batch_create_new_execution(
1311 current_time,
1312 vec![join_set, child_exec_req, join_next],
1313 parent_execution_id.clone(),
1314 Version::new(2),
1315 vec![child],
1316 )
1317 .await
1318 .unwrap();
1319 }
1320
1321 let child_worker =
1322 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1323
1324 tick_fn(
1326 ExecConfig {
1327 batch_size: 1,
1328 lock_expiry: LOCK_EXPIRY,
1329 tick_sleep: Duration::ZERO,
1330 component_id: ComponentId::dummy_activity(),
1331 task_limiter: None,
1332 },
1333 sim_clock.clone(),
1334 db_pool.clone(),
1335 child_worker,
1336 sim_clock.now(),
1337 )
1338 .await;
1339 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1340 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1342 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1343 .await
1344 .unwrap();
1345 }
1346 let child_log = db_pool
1347 .connection()
1348 .get(&ExecutionId::Derived(child_execution_id.clone()))
1349 .await
1350 .unwrap();
1351 assert!(child_log.pending_state.is_finished());
1352 assert_eq!(
1353 Version(2),
1354 child_log.next_version,
1355 "created = 0, locked = 1, with_single_result = 2"
1356 );
1357 assert_eq!(
1358 ExecutionEventInner::Finished {
1359 result: Err(expected_child_err),
1360 http_client_traces: None
1361 },
1362 child_log.last_event().event
1363 );
1364 let parent_log = db_pool
1365 .connection()
1366 .get(&parent_execution_id)
1367 .await
1368 .unwrap();
1369 assert_matches!(
1370 parent_log.pending_state,
1371 PendingState::PendingAt {
1372 scheduled_at
1373 } if scheduled_at == sim_clock.now(),
1374 "parent should be back to pending"
1375 );
1376 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1377 parent_log.responses.last(),
1378 Some(JoinSetResponseEventOuter{
1379 created_at: at,
1380 event: JoinSetResponseEvent{
1381 join_set_id: found_join_set_id,
1382 event: JoinSetResponse::ChildExecutionFinished {
1383 child_execution_id: found_child_execution_id,
1384 finished_version,
1385 result: found_result,
1386 }
1387 }
1388 })
1389 if *at == sim_clock.now()
1390 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1391 );
1392 assert_eq!(join_set_id, *found_join_set_id);
1393 assert_eq!(child_execution_id, *found_child_execution_id);
1394 assert_eq!(child_log.next_version, *child_finished_version);
1395 assert!(found_result.is_err());
1396
1397 db_pool.close().await.unwrap();
1398 }
1399
1400 #[derive(Clone, Debug)]
1401 struct SleepyWorker {
1402 duration: Duration,
1403 result: SupportedFunctionReturnValue,
1404 exported: [FunctionMetadata; 1],
1405 }
1406
1407 #[async_trait]
1408 impl Worker for SleepyWorker {
1409 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1410 tokio::time::sleep(self.duration).await;
1411 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1412 }
1413
1414 fn exported_functions(&self) -> &[FunctionMetadata] {
1415 &self.exported
1416 }
1417
1418 fn imported_functions(&self) -> &[FunctionMetadata] {
1419 &[]
1420 }
1421 }
1422
1423 #[expect(clippy::too_many_lines)]
1424 #[tokio::test]
1425 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1426 set_up();
1427 let sim_clock = SimClock::default();
1428 let (_guard, db_pool) = Database::Memory.set_up().await;
1429 let lock_expiry = Duration::from_millis(100);
1430 let exec_config = ExecConfig {
1431 batch_size: 1,
1432 lock_expiry,
1433 tick_sleep: Duration::ZERO,
1434 component_id: ComponentId::dummy_activity(),
1435 task_limiter: None,
1436 };
1437
1438 let worker = Arc::new(SleepyWorker {
1439 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1441 exported: [FunctionMetadata {
1442 ffqn: FFQN_SOME,
1443 parameter_types: ParameterTypes::default(),
1444 return_type: None,
1445 extension: None,
1446 submittable: true,
1447 }],
1448 });
1449 let execution_id = ExecutionId::generate();
1451 let timeout_duration = Duration::from_millis(300);
1452 let db_connection = db_pool.connection();
1453 db_connection
1454 .create(CreateRequest {
1455 created_at: sim_clock.now(),
1456 execution_id: execution_id.clone(),
1457 ffqn: FFQN_SOME,
1458 params: Params::empty(),
1459 parent: None,
1460 metadata: concepts::ExecutionMetadata::empty(),
1461 scheduled_at: sim_clock.now(),
1462 retry_exp_backoff: timeout_duration,
1463 max_retries: 1,
1464 component_id: ComponentId::dummy_activity(),
1465 scheduled_by: None,
1466 })
1467 .await
1468 .unwrap();
1469
1470 let ffqns = super::extract_ffqns(worker.as_ref());
1471 let executor = ExecTask::new(
1472 worker,
1473 exec_config,
1474 sim_clock.clone(),
1475 db_pool.clone(),
1476 ffqns,
1477 );
1478 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1479 assert_eq!(1, first_execution_progress.executions.len());
1480 sim_clock.move_time_forward(lock_expiry).await;
1482 let now_after_first_lock_expiry = sim_clock.now();
1484 {
1485 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1486 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1487 assert!(cleanup_progress.executions.is_empty());
1488 }
1489 {
1490 let expired_locks =
1491 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1492 .await
1493 .unwrap()
1494 .expired_locks;
1495 assert_eq!(1, expired_locks);
1496 }
1497 assert!(
1498 !first_execution_progress
1499 .executions
1500 .pop()
1501 .unwrap()
1502 .1
1503 .is_finished()
1504 );
1505
1506 let execution_log = db_connection.get(&execution_id).await.unwrap();
1507 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1508 assert_matches!(
1509 &execution_log.events.get(2).unwrap(),
1510 ExecutionEvent {
1511 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1512 created_at: at,
1513 backtrace_id: None,
1514 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1515 );
1516 assert_eq!(
1517 PendingState::PendingAt {
1518 scheduled_at: expected_first_timeout_expiry
1519 },
1520 execution_log.pending_state
1521 );
1522 sim_clock.move_time_forward(timeout_duration).await;
1523 let now_after_first_timeout = sim_clock.now();
1524 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1525
1526 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1527 assert_eq!(1, second_execution_progress.executions.len());
1528
1529 sim_clock.move_time_forward(lock_expiry).await;
1531 let now_after_second_lock_expiry = sim_clock.now();
1533 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1534 {
1535 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1536 assert!(cleanup_progress.executions.is_empty());
1537 }
1538 {
1539 let expired_locks =
1540 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1541 .await
1542 .unwrap()
1543 .expired_locks;
1544 assert_eq!(1, expired_locks);
1545 }
1546 assert!(
1547 !second_execution_progress
1548 .executions
1549 .pop()
1550 .unwrap()
1551 .1
1552 .is_finished()
1553 );
1554
1555 drop(db_connection);
1556 drop(executor);
1557 db_pool.close().await.unwrap();
1558 }
1559}