1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
10use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
11use concepts::{
12 FinishedExecutionError,
13 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
14};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 "otel.name" = format!("worker {}", locked_execution.ffqn),
240 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241 locked_execution.metadata.enrich(&worker_span);
242 tokio::spawn({
243 let worker_span2 = worker_span.clone();
244 async move {
245 let res = Self::run_worker(
246 worker,
247 &db_pool,
248 execution_deadline,
249 clock_fn,
250 locked_execution,
251 worker_span2,
252 )
253 .await;
254 if let Err(db_error) = res {
255 error!("Execution will be timed out not writing `{db_error:?}`");
256 }
257 drop(permit);
258 }
259 .instrument(worker_span)
260 })
261 };
262 executions.push((execution_id, join_handle));
263 }
264 Ok(ExecutionProgress { executions })
265 }
266
267 async fn run_worker(
268 worker: Arc<dyn Worker>,
269 db_pool: &P,
270 execution_deadline: DateTime<Utc>,
271 clock_fn: C,
272 locked_execution: LockedExecution,
273 worker_span: Span,
274 ) -> Result<(), DbError> {
275 debug!("Worker::run starting");
276 trace!(
277 version = %locked_execution.version,
278 params = ?locked_execution.params,
279 event_history = ?locked_execution.event_history,
280 "Worker::run starting"
281 );
282 let can_be_retried = ExecutionLog::can_be_retried_after(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.max_retries,
285 locked_execution.retry_exp_backoff,
286 );
287 let unlock_expiry_on_limit_reached =
288 ExecutionLog::compute_retry_duration_when_retrying_forever(
289 locked_execution.temporary_event_count + 1,
290 locked_execution.retry_exp_backoff,
291 );
292 let ctx = WorkerContext {
293 execution_id: locked_execution.execution_id.clone(),
294 metadata: locked_execution.metadata,
295 ffqn: locked_execution.ffqn,
296 params: locked_execution.params,
297 event_history: locked_execution.event_history,
298 responses: locked_execution
299 .responses
300 .into_iter()
301 .map(|outer| outer.event)
302 .collect(),
303 version: locked_execution.version,
304 execution_deadline,
305 can_be_retried: can_be_retried.is_some(),
306 run_id: locked_execution.run_id,
307 worker_span,
308 };
309 let worker_result = worker.run(ctx).await;
310 trace!(?worker_result, "Worker::run finished");
311 let result_obtained_at = clock_fn.now();
312 match Self::worker_result_to_execution_event(
313 locked_execution.execution_id,
314 worker_result,
315 result_obtained_at,
316 locked_execution.parent,
317 can_be_retried,
318 unlock_expiry_on_limit_reached,
319 )? {
320 Some(append) => {
321 let db_connection = db_pool.connection();
322 trace!("Appending {append:?}");
323 append.clone().append(&db_connection).await
324 }
325 None => Ok(()),
326 }
327 }
328
329 #[expect(clippy::too_many_lines)]
332 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version, http_client_traces) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished {
357 result: Ok(result),
358 http_client_traces,
359 },
360 };
361
362 Some(Append {
363 created_at: result_obtained_at,
364 primary_event,
365 execution_id,
366 version: new_version,
367 child_finished,
368 })
369 }
370 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
371 WorkerResult::Err(err) => {
372 let reason = err.to_string();
373 let (primary_event, child_finished, version) = match err {
374 WorkerError::TemporaryTimeout {
375 http_client_traces,
376 version,
377 } => {
378 if let Some(duration) = can_be_retried {
379 let backoff_expires_at = result_obtained_at + duration;
380 info!(
381 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
382 );
383 (
384 ExecutionEventInner::TemporarilyTimedOut {
385 backoff_expires_at,
386 http_client_traces,
387 },
388 None,
389 version,
390 )
391 } else {
392 info!("Permanent timeout");
393 let result = Err(FinishedExecutionError::PermanentTimeout);
394 let child_finished =
395 parent.map(|(parent_execution_id, parent_join_set)| {
396 ChildFinishedResponse {
397 parent_execution_id,
398 parent_join_set,
399 result: result.clone(),
400 }
401 });
402 (
403 ExecutionEventInner::Finished {
404 result,
405 http_client_traces,
406 },
407 child_finished,
408 version,
409 )
410 }
411 }
412 WorkerError::DbError(db_error) => {
413 return Err(db_error);
414 }
415 WorkerError::ActivityTrap {
416 reason: reason_inner,
417 trap_kind,
418 detail,
419 version,
420 http_client_traces,
421 } => {
422 if let Some(duration) = can_be_retried {
423 let expires_at = result_obtained_at + duration;
424 debug!(
425 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
426 );
427 (
428 ExecutionEventInner::TemporarilyFailed {
429 backoff_expires_at: expires_at,
430 reason_full: StrVariant::from(reason),
431 reason_inner: StrVariant::from(reason_inner),
432 detail: Some(detail),
433 http_client_traces,
434 },
435 None,
436 version,
437 )
438 } else {
439 info!(
440 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
441 );
442 let result = Err(FinishedExecutionError::PermanentFailure {
443 reason_inner,
444 reason_full: reason,
445 kind: PermanentFailureKind::ActivityTrap,
446 detail: Some(detail),
447 });
448 let child_finished =
449 parent.map(|(parent_execution_id, parent_join_set)| {
450 ChildFinishedResponse {
451 parent_execution_id,
452 parent_join_set,
453 result: result.clone(),
454 }
455 });
456 (
457 ExecutionEventInner::Finished {
458 result,
459 http_client_traces,
460 },
461 child_finished,
462 version,
463 )
464 }
465 }
466 WorkerError::ActivityReturnedError {
467 detail,
468 version,
469 http_client_traces,
470 } => {
471 let duration = can_be_retried.expect(
472 "ActivityReturnedError must not be returned when retries are exhausted",
473 );
474 let expires_at = result_obtained_at + duration;
475 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
476 (
477 ExecutionEventInner::TemporarilyFailed {
478 backoff_expires_at: expires_at,
479 reason_full: StrVariant::Static("activity returned error"),
480 reason_inner: StrVariant::Static("activity returned error"),
481 detail,
482 http_client_traces,
483 },
484 None,
485 version,
486 )
487 }
488 WorkerError::TemporaryWorkflowTrap {
489 reason: reason_inner,
490 kind,
491 detail,
492 version,
493 } => {
494 let duration = can_be_retried.expect("workflows are retried forever");
495 let expires_at = result_obtained_at + duration;
496 debug!(
497 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
498 );
499 (
500 ExecutionEventInner::TemporarilyFailed {
501 backoff_expires_at: expires_at,
502 reason_full: StrVariant::from(reason),
503 reason_inner: StrVariant::from(reason_inner),
504 detail,
505 http_client_traces: None,
506 },
507 None,
508 version,
509 )
510 }
511 WorkerError::LimitReached {
512 reason: inner_reason,
513 version: new_version,
514 } => {
515 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
516 warn!(
517 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
518 );
519 (
520 ExecutionEventInner::Unlocked {
521 backoff_expires_at: expires_at,
522 reason: StrVariant::from(reason),
523 },
524 None,
525 new_version,
526 )
527 }
528 WorkerError::FatalError(fatal_error, version) => {
529 info!("Fatal worker error - {fatal_error:?}");
530 let result = Err(FinishedExecutionError::from(fatal_error));
531 let child_finished =
532 parent.map(|(parent_execution_id, parent_join_set)| {
533 ChildFinishedResponse {
534 parent_execution_id,
535 parent_join_set,
536 result: result.clone(),
537 }
538 });
539 (
540 ExecutionEventInner::Finished {
541 result,
542 http_client_traces: None,
543 },
544 child_finished,
545 version,
546 )
547 }
548 };
549 Some(Append {
550 created_at: result_obtained_at,
551 primary_event: AppendRequest {
552 created_at: result_obtained_at,
553 event: primary_event,
554 },
555 execution_id,
556 version,
557 child_finished,
558 })
559 }
560 })
561 }
562}
563
564#[derive(Debug, Clone)]
565pub(crate) struct ChildFinishedResponse {
566 pub(crate) parent_execution_id: ExecutionId,
567 pub(crate) parent_join_set: JoinSetId,
568 pub(crate) result: FinishedExecutionResult,
569}
570
571#[derive(Debug, Clone)]
572pub(crate) struct Append {
573 pub(crate) created_at: DateTime<Utc>,
574 pub(crate) primary_event: AppendRequest,
575 pub(crate) execution_id: ExecutionId,
576 pub(crate) version: Version,
577 pub(crate) child_finished: Option<ChildFinishedResponse>,
578}
579
580impl Append {
581 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
582 if let Some(child_finished) = self.child_finished {
583 assert_matches!(
584 &self.primary_event,
585 AppendRequest {
586 event: ExecutionEventInner::Finished { .. },
587 ..
588 }
589 );
590 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
591 db_connection
592 .append_batch_respond_to_parent(
593 derived.clone(),
594 self.created_at,
595 vec![self.primary_event],
596 self.version.clone(),
597 child_finished.parent_execution_id,
598 JoinSetResponseEventOuter {
599 created_at: self.created_at,
600 event: JoinSetResponseEvent {
601 join_set_id: child_finished.parent_join_set,
602 event: JoinSetResponse::ChildExecutionFinished {
603 child_execution_id: derived,
604 finished_version: self.version,
606 result: child_finished.result,
607 },
608 },
609 },
610 )
611 .await?;
612 } else {
613 db_connection
614 .append_batch(
615 self.created_at,
616 vec![self.primary_event],
617 self.execution_id,
618 self.version,
619 )
620 .await?;
621 }
622 Ok(())
623 }
624}
625
626#[cfg(any(test, feature = "test"))]
627pub mod simple_worker {
628 use crate::worker::{Worker, WorkerContext, WorkerResult};
629 use async_trait::async_trait;
630 use concepts::{
631 FunctionFqn, FunctionMetadata, ParameterTypes,
632 storage::{HistoryEvent, Version},
633 };
634 use indexmap::IndexMap;
635 use std::sync::Arc;
636 use tracing::trace;
637
638 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
639 pub type SimpleWorkerResultMap =
640 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
641
642 #[derive(Clone, Debug)]
643 pub struct SimpleWorker {
644 pub worker_results_rev: SimpleWorkerResultMap,
645 pub ffqn: FunctionFqn,
646 exported: [FunctionMetadata; 1],
647 }
648
649 impl SimpleWorker {
650 #[must_use]
651 pub fn with_single_result(res: WorkerResult) -> Self {
652 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
653 Version::new(2),
654 (vec![], res),
655 )]))))
656 }
657
658 #[must_use]
659 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
660 Self {
661 worker_results_rev: self.worker_results_rev,
662 exported: [FunctionMetadata {
663 ffqn: ffqn.clone(),
664 parameter_types: ParameterTypes::default(),
665 return_type: None,
666 extension: None,
667 submittable: true,
668 }],
669 ffqn,
670 }
671 }
672
673 #[must_use]
674 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
675 Self {
676 worker_results_rev,
677 ffqn: FFQN_SOME,
678 exported: [FunctionMetadata {
679 ffqn: FFQN_SOME,
680 parameter_types: ParameterTypes::default(),
681 return_type: None,
682 extension: None,
683 submittable: true,
684 }],
685 }
686 }
687 }
688
689 #[async_trait]
690 impl Worker for SimpleWorker {
691 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
692 let (expected_version, (expected_eh, worker_result)) =
693 self.worker_results_rev.lock().unwrap().pop().unwrap();
694 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
695 assert_eq!(expected_version, ctx.version);
696 assert_eq!(expected_eh, ctx.event_history);
697 worker_result
698 }
699
700 fn exported_functions(&self) -> &[FunctionMetadata] {
701 &self.exported
702 }
703 }
704}
705
706#[cfg(test)]
707mod tests {
708 use self::simple_worker::SimpleWorker;
709 use super::*;
710 use crate::worker::FatalError;
711 use crate::{expired_timers_watcher, worker::WorkerResult};
712 use assert_matches::assert_matches;
713 use async_trait::async_trait;
714 use concepts::storage::{CreateRequest, JoinSetRequest};
715 use concepts::storage::{
716 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
717 };
718 use concepts::time::Now;
719 use concepts::{
720 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
721 SupportedFunctionReturnValue, TrapKind,
722 };
723 use db_tests::Database;
724 use indexmap::IndexMap;
725 use simple_worker::FFQN_SOME;
726 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
727 use test_utils::set_up;
728 use test_utils::sim_clock::{ConstClock, SimClock};
729
730 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
731
732 async fn tick_fn<
733 W: Worker + Debug,
734 C: ClockFn + 'static,
735 DB: DbConnection + 'static,
736 P: DbPool<DB> + 'static,
737 >(
738 config: ExecConfig,
739 clock_fn: C,
740 db_pool: P,
741 worker: Arc<W>,
742 executed_at: DateTime<Utc>,
743 ) -> ExecutionProgress {
744 trace!("Ticking with {worker:?}");
745 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
746 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
747 let mut execution_progress = executor.tick(executed_at).await.unwrap();
748 loop {
749 execution_progress
750 .executions
751 .retain(|(_, abort_handle)| !abort_handle.is_finished());
752 if execution_progress.executions.is_empty() {
753 return execution_progress;
754 }
755 tokio::time::sleep(Duration::from_millis(10)).await;
756 }
757 }
758
759 #[tokio::test]
760 async fn execute_simple_lifecycle_tick_based_mem() {
761 let created_at = Now.now();
762 let (_guard, db_pool) = Database::Memory.set_up().await;
763 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
764 db_pool.close().await.unwrap();
765 }
766
767 #[cfg(not(madsim))]
768 #[tokio::test]
769 async fn execute_simple_lifecycle_tick_based_sqlite() {
770 let created_at = Now.now();
771 let (_guard, db_pool) = Database::Sqlite.set_up().await;
772 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
773 db_pool.close().await.unwrap();
774 }
775
776 async fn execute_simple_lifecycle_tick_based<
777 DB: DbConnection + 'static,
778 P: DbPool<DB> + 'static,
779 C: ClockFn + 'static,
780 >(
781 pool: P,
782 clock_fn: C,
783 ) {
784 set_up();
785 let created_at = clock_fn.now();
786 let exec_config = ExecConfig {
787 batch_size: 1,
788 lock_expiry: Duration::from_secs(1),
789 tick_sleep: Duration::from_millis(100),
790 component_id: ComponentId::dummy_activity(),
791 task_limiter: None,
792 };
793
794 let execution_log = create_and_tick(
795 CreateAndTickConfig {
796 execution_id: ExecutionId::generate(),
797 created_at,
798 max_retries: 0,
799 executed_at: created_at,
800 retry_exp_backoff: Duration::ZERO,
801 },
802 clock_fn,
803 pool,
804 exec_config,
805 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
806 SupportedFunctionReturnValue::None,
807 Version::new(2),
808 None,
809 ))),
810 tick_fn,
811 )
812 .await;
813 assert_matches!(
814 execution_log.events.get(2).unwrap(),
815 ExecutionEvent {
816 event: ExecutionEventInner::Finished {
817 result: Ok(SupportedFunctionReturnValue::None),
818 http_client_traces: None
819 },
820 created_at: _,
821 backtrace_id: None,
822 }
823 );
824 }
825
826 #[tokio::test]
827 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
828 set_up();
829 let created_at = Now.now();
830 let clock_fn = ConstClock(created_at);
831 let (_guard, db_pool) = Database::Memory.set_up().await;
832 let exec_config = ExecConfig {
833 batch_size: 1,
834 lock_expiry: Duration::from_secs(1),
835 tick_sleep: Duration::ZERO,
836 component_id: ComponentId::dummy_activity(),
837 task_limiter: None,
838 };
839
840 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
841 SupportedFunctionReturnValue::None,
842 Version::new(2),
843 None,
844 )));
845 let exec_task = ExecTask::spawn_new(
846 worker.clone(),
847 exec_config.clone(),
848 clock_fn,
849 db_pool.clone(),
850 ExecutorId::generate(),
851 );
852
853 let execution_log = create_and_tick(
854 CreateAndTickConfig {
855 execution_id: ExecutionId::generate(),
856 created_at,
857 max_retries: 0,
858 executed_at: created_at,
859 retry_exp_backoff: Duration::ZERO,
860 },
861 clock_fn,
862 db_pool.clone(),
863 exec_config,
864 worker,
865 |_, _, _, _, _| async {
866 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
868 },
869 )
870 .await;
871 exec_task.close().await;
872 db_pool.close().await.unwrap();
873 assert_matches!(
874 execution_log.events.get(2).unwrap(),
875 ExecutionEvent {
876 event: ExecutionEventInner::Finished {
877 result: Ok(SupportedFunctionReturnValue::None),
878 http_client_traces: None
879 },
880 created_at: _,
881 backtrace_id: None,
882 }
883 );
884 }
885
886 struct CreateAndTickConfig {
887 execution_id: ExecutionId,
888 created_at: DateTime<Utc>,
889 max_retries: u32,
890 executed_at: DateTime<Utc>,
891 retry_exp_backoff: Duration,
892 }
893
894 async fn create_and_tick<
895 W: Worker,
896 C: ClockFn,
897 DB: DbConnection,
898 P: DbPool<DB>,
899 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
900 F: Future<Output = ExecutionProgress>,
901 >(
902 config: CreateAndTickConfig,
903 clock_fn: C,
904 db_pool: P,
905 exec_config: ExecConfig,
906 worker: Arc<W>,
907 mut tick: T,
908 ) -> ExecutionLog {
909 let db_connection = db_pool.connection();
911 db_connection
912 .create(CreateRequest {
913 created_at: config.created_at,
914 execution_id: config.execution_id.clone(),
915 ffqn: FFQN_SOME,
916 params: Params::empty(),
917 parent: None,
918 metadata: concepts::ExecutionMetadata::empty(),
919 scheduled_at: config.created_at,
920 retry_exp_backoff: config.retry_exp_backoff,
921 max_retries: config.max_retries,
922 component_id: ComponentId::dummy_activity(),
923 scheduled_by: None,
924 })
925 .await
926 .unwrap();
927 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
929 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
930 debug!("Execution history after tick: {execution_log:?}");
931 assert_matches!(
933 execution_log.events.first().unwrap(),
934 ExecutionEvent {
935 event: ExecutionEventInner::Created { .. },
936 created_at: actually_created_at,
937 backtrace_id: None,
938 }
939 if config.created_at == *actually_created_at
940 );
941 let locked_at = assert_matches!(
942 execution_log.events.get(1).unwrap(),
943 ExecutionEvent {
944 event: ExecutionEventInner::Locked { .. },
945 created_at: locked_at,
946 backtrace_id: None,
947 } if config.created_at <= *locked_at
948 => *locked_at
949 );
950 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
951 event: _,
952 created_at: executed_at,
953 backtrace_id: None,
954 } if *executed_at >= locked_at);
955 execution_log
956 }
957
958 #[expect(clippy::too_many_lines)]
959 #[tokio::test]
960 async fn activity_trap_should_trigger_an_execution_retry() {
961 set_up();
962 let sim_clock = SimClock::default();
963 let (_guard, db_pool) = Database::Memory.set_up().await;
964 let exec_config = ExecConfig {
965 batch_size: 1,
966 lock_expiry: Duration::from_secs(1),
967 tick_sleep: Duration::ZERO,
968 component_id: ComponentId::dummy_activity(),
969 task_limiter: None,
970 };
971 let expected_reason = "error reason";
972 let expected_detail = "error detail";
973 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
974 WorkerError::ActivityTrap {
975 reason: expected_reason.to_string(),
976 trap_kind: concepts::TrapKind::Trap,
977 detail: expected_detail.to_string(),
978 version: Version::new(2),
979 http_client_traces: None,
980 },
981 )));
982 let retry_exp_backoff = Duration::from_millis(100);
983 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
984 let execution_log = create_and_tick(
985 CreateAndTickConfig {
986 execution_id: ExecutionId::generate(),
987 created_at: sim_clock.now(),
988 max_retries: 1,
989 executed_at: sim_clock.now(),
990 retry_exp_backoff,
991 },
992 sim_clock.clone(),
993 db_pool.clone(),
994 exec_config.clone(),
995 worker,
996 tick_fn,
997 )
998 .await;
999 assert_eq!(3, execution_log.events.len());
1000 {
1001 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1002 &execution_log.events.get(2).unwrap(),
1003 ExecutionEvent {
1004 event: ExecutionEventInner::TemporarilyFailed {
1005 reason_inner,
1006 reason_full,
1007 detail,
1008 backoff_expires_at,
1009 http_client_traces: None,
1010 },
1011 created_at: at,
1012 backtrace_id: None,
1013 }
1014 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1015 );
1016 assert_eq!(expected_reason, reason_inner.deref());
1017 assert_eq!(
1018 format!("activity trap: {expected_reason}"),
1019 reason_full.deref()
1020 );
1021 assert_eq!(Some(expected_detail), detail.as_deref());
1022 assert_eq!(at, sim_clock.now());
1023 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1024 }
1025 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1026 std::sync::Mutex::new(IndexMap::from([(
1027 Version::new(4),
1028 (
1029 vec![],
1030 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1031 ),
1032 )])),
1033 )));
1034 assert!(
1036 tick_fn(
1037 exec_config.clone(),
1038 sim_clock.clone(),
1039 db_pool.clone(),
1040 worker.clone(),
1041 sim_clock.now(),
1042 )
1043 .await
1044 .executions
1045 .is_empty()
1046 );
1047 sim_clock.move_time_forward(retry_exp_backoff).await;
1049 tick_fn(
1050 exec_config,
1051 sim_clock.clone(),
1052 db_pool.clone(),
1053 worker,
1054 sim_clock.now(),
1055 )
1056 .await;
1057 let execution_log = {
1058 let db_connection = db_pool.connection();
1059 db_connection
1060 .get(&execution_log.execution_id)
1061 .await
1062 .unwrap()
1063 };
1064 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1065 assert_matches!(
1066 execution_log.events.get(3).unwrap(),
1067 ExecutionEvent {
1068 event: ExecutionEventInner::Locked { .. },
1069 created_at: at,
1070 backtrace_id: None,
1071 } if *at == sim_clock.now()
1072 );
1073 assert_matches!(
1074 execution_log.events.get(4).unwrap(),
1075 ExecutionEvent {
1076 event: ExecutionEventInner::Finished {
1077 result: Ok(SupportedFunctionReturnValue::None),
1078 http_client_traces: None
1079 },
1080 created_at: finished_at,
1081 backtrace_id: None,
1082 } if *finished_at == sim_clock.now()
1083 );
1084 db_pool.close().await.unwrap();
1085 }
1086
1087 #[tokio::test]
1088 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1089 set_up();
1090 let created_at = Now.now();
1091 let clock_fn = ConstClock(created_at);
1092 let (_guard, db_pool) = Database::Memory.set_up().await;
1093 let exec_config = ExecConfig {
1094 batch_size: 1,
1095 lock_expiry: Duration::from_secs(1),
1096 tick_sleep: Duration::ZERO,
1097 component_id: ComponentId::dummy_activity(),
1098 task_limiter: None,
1099 };
1100
1101 let expected_reason = "error reason";
1102 let expected_detail = "error detail";
1103 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1104 WorkerError::ActivityTrap {
1105 reason: expected_reason.to_string(),
1106 trap_kind: concepts::TrapKind::Trap,
1107 detail: expected_detail.to_string(),
1108 version: Version::new(2),
1109 http_client_traces: None,
1110 },
1111 )));
1112 let execution_log = create_and_tick(
1113 CreateAndTickConfig {
1114 execution_id: ExecutionId::generate(),
1115 created_at,
1116 max_retries: 0,
1117 executed_at: created_at,
1118 retry_exp_backoff: Duration::ZERO,
1119 },
1120 clock_fn,
1121 db_pool.clone(),
1122 exec_config.clone(),
1123 worker,
1124 tick_fn,
1125 )
1126 .await;
1127 assert_eq!(3, execution_log.events.len());
1128 let (reason, kind, detail) = assert_matches!(
1129 &execution_log.events.get(2).unwrap(),
1130 ExecutionEvent {
1131 event: ExecutionEventInner::Finished{
1132 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1133 http_client_traces: None
1134 },
1135 created_at: at,
1136 backtrace_id: None,
1137 } if *at == created_at
1138 => (reason_inner, kind, detail)
1139 );
1140 assert_eq!(expected_reason, *reason);
1141 assert_eq!(Some(expected_detail), detail.as_deref());
1142 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1143
1144 db_pool.close().await.unwrap();
1145 }
1146
1147 #[tokio::test]
1148 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1149 let worker_error = WorkerError::ActivityTrap {
1150 reason: "error reason".to_string(),
1151 trap_kind: TrapKind::Trap,
1152 detail: "detail".to_string(),
1153 version: Version::new(2),
1154 http_client_traces: None,
1155 };
1156 let expected_child_err = FinishedExecutionError::PermanentFailure {
1157 reason_full: "activity trap: error reason".to_string(),
1158 reason_inner: "error reason".to_string(),
1159 kind: PermanentFailureKind::ActivityTrap,
1160 detail: Some("detail".to_string()),
1161 };
1162 child_execution_permanently_failed_should_notify_parent(
1163 WorkerResult::Err(worker_error),
1164 expected_child_err,
1165 )
1166 .await;
1167 }
1168
1169 #[tokio::test]
1172 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1173 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1174 child_execution_permanently_failed_should_notify_parent(
1175 WorkerResult::DbUpdatedByWorkerOrWatcher,
1176 expected_child_err,
1177 )
1178 .await;
1179 }
1180
1181 #[tokio::test]
1182 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1183 let parent_id = ExecutionId::from_parts(1, 1);
1184 let join_set_id_outer =
1185 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1186 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1187 let join_set_id_inner =
1188 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1189 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1190 let worker_error = WorkerError::FatalError(
1191 FatalError::UnhandledChildExecutionError {
1192 child_execution_id: child_execution_id.clone(),
1193 root_cause_id: root_cause_id.clone(),
1194 },
1195 Version::new(2),
1196 );
1197 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1198 child_execution_id,
1199 root_cause_id,
1200 };
1201 child_execution_permanently_failed_should_notify_parent(
1202 WorkerResult::Err(worker_error),
1203 expected_child_err,
1204 )
1205 .await;
1206 }
1207
1208 #[expect(clippy::too_many_lines)]
1209 async fn child_execution_permanently_failed_should_notify_parent(
1210 worker_result: WorkerResult,
1211 expected_child_err: FinishedExecutionError,
1212 ) {
1213 use concepts::storage::JoinSetResponseEventOuter;
1214 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1215
1216 set_up();
1217 let sim_clock = SimClock::default();
1218 let (_guard, db_pool) = Database::Memory.set_up().await;
1219
1220 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1221 WorkerResult::DbUpdatedByWorkerOrWatcher,
1222 ));
1223 let parent_execution_id = ExecutionId::generate();
1224 db_pool
1225 .connection()
1226 .create(CreateRequest {
1227 created_at: sim_clock.now(),
1228 execution_id: parent_execution_id.clone(),
1229 ffqn: FFQN_SOME,
1230 params: Params::empty(),
1231 parent: None,
1232 metadata: concepts::ExecutionMetadata::empty(),
1233 scheduled_at: sim_clock.now(),
1234 retry_exp_backoff: Duration::ZERO,
1235 max_retries: 0,
1236 component_id: ComponentId::dummy_activity(),
1237 scheduled_by: None,
1238 })
1239 .await
1240 .unwrap();
1241 tick_fn(
1242 ExecConfig {
1243 batch_size: 1,
1244 lock_expiry: LOCK_EXPIRY,
1245 tick_sleep: Duration::ZERO,
1246 component_id: ComponentId::dummy_activity(),
1247 task_limiter: None,
1248 },
1249 sim_clock.clone(),
1250 db_pool.clone(),
1251 parent_worker,
1252 sim_clock.now(),
1253 )
1254 .await;
1255
1256 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1257 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1258 {
1260 let child = CreateRequest {
1261 created_at: sim_clock.now(),
1262 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1263 ffqn: FFQN_CHILD,
1264 params: Params::empty(),
1265 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1266 metadata: concepts::ExecutionMetadata::empty(),
1267 scheduled_at: sim_clock.now(),
1268 retry_exp_backoff: Duration::ZERO,
1269 max_retries: 0,
1270 component_id: ComponentId::dummy_activity(),
1271 scheduled_by: None,
1272 };
1273 let current_time = sim_clock.now();
1274 let join_set = AppendRequest {
1275 created_at: current_time,
1276 event: ExecutionEventInner::HistoryEvent {
1277 event: HistoryEvent::JoinSetCreate {
1278 join_set_id: join_set_id.clone(),
1279 closing_strategy: ClosingStrategy::Complete,
1280 },
1281 },
1282 };
1283 let child_exec_req = AppendRequest {
1284 created_at: current_time,
1285 event: ExecutionEventInner::HistoryEvent {
1286 event: HistoryEvent::JoinSetRequest {
1287 join_set_id: join_set_id.clone(),
1288 request: JoinSetRequest::ChildExecutionRequest {
1289 child_execution_id: child_execution_id.clone(),
1290 },
1291 },
1292 },
1293 };
1294 let join_next = AppendRequest {
1295 created_at: current_time,
1296 event: ExecutionEventInner::HistoryEvent {
1297 event: HistoryEvent::JoinNext {
1298 join_set_id: join_set_id.clone(),
1299 run_expires_at: sim_clock.now(),
1300 closing: false,
1301 },
1302 },
1303 };
1304 db_pool
1305 .connection()
1306 .append_batch_create_new_execution(
1307 current_time,
1308 vec![join_set, child_exec_req, join_next],
1309 parent_execution_id.clone(),
1310 Version::new(2),
1311 vec![child],
1312 )
1313 .await
1314 .unwrap();
1315 }
1316
1317 let child_worker =
1318 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1319
1320 tick_fn(
1322 ExecConfig {
1323 batch_size: 1,
1324 lock_expiry: LOCK_EXPIRY,
1325 tick_sleep: Duration::ZERO,
1326 component_id: ComponentId::dummy_activity(),
1327 task_limiter: None,
1328 },
1329 sim_clock.clone(),
1330 db_pool.clone(),
1331 child_worker,
1332 sim_clock.now(),
1333 )
1334 .await;
1335 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1336 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1338 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1339 .await
1340 .unwrap();
1341 }
1342 let child_log = db_pool
1343 .connection()
1344 .get(&ExecutionId::Derived(child_execution_id.clone()))
1345 .await
1346 .unwrap();
1347 assert!(child_log.pending_state.is_finished());
1348 assert_eq!(
1349 Version(2),
1350 child_log.next_version,
1351 "created = 0, locked = 1, with_single_result = 2"
1352 );
1353 assert_eq!(
1354 ExecutionEventInner::Finished {
1355 result: Err(expected_child_err),
1356 http_client_traces: None
1357 },
1358 child_log.last_event().event
1359 );
1360 let parent_log = db_pool
1361 .connection()
1362 .get(&parent_execution_id)
1363 .await
1364 .unwrap();
1365 assert_matches!(
1366 parent_log.pending_state,
1367 PendingState::PendingAt {
1368 scheduled_at
1369 } if scheduled_at == sim_clock.now(),
1370 "parent should be back to pending"
1371 );
1372 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1373 parent_log.responses.last(),
1374 Some(JoinSetResponseEventOuter{
1375 created_at: at,
1376 event: JoinSetResponseEvent{
1377 join_set_id: found_join_set_id,
1378 event: JoinSetResponse::ChildExecutionFinished {
1379 child_execution_id: found_child_execution_id,
1380 finished_version,
1381 result: found_result,
1382 }
1383 }
1384 })
1385 if *at == sim_clock.now()
1386 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1387 );
1388 assert_eq!(join_set_id, *found_join_set_id);
1389 assert_eq!(child_execution_id, *found_child_execution_id);
1390 assert_eq!(child_log.next_version, *child_finished_version);
1391 assert!(found_result.is_err());
1392
1393 db_pool.close().await.unwrap();
1394 }
1395
1396 #[derive(Clone, Debug)]
1397 struct SleepyWorker {
1398 duration: Duration,
1399 result: SupportedFunctionReturnValue,
1400 exported: [FunctionMetadata; 1],
1401 }
1402
1403 #[async_trait]
1404 impl Worker for SleepyWorker {
1405 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1406 tokio::time::sleep(self.duration).await;
1407 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1408 }
1409
1410 fn exported_functions(&self) -> &[FunctionMetadata] {
1411 &self.exported
1412 }
1413 }
1414
1415 #[expect(clippy::too_many_lines)]
1416 #[tokio::test]
1417 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1418 set_up();
1419 let sim_clock = SimClock::default();
1420 let (_guard, db_pool) = Database::Memory.set_up().await;
1421 let lock_expiry = Duration::from_millis(100);
1422 let exec_config = ExecConfig {
1423 batch_size: 1,
1424 lock_expiry,
1425 tick_sleep: Duration::ZERO,
1426 component_id: ComponentId::dummy_activity(),
1427 task_limiter: None,
1428 };
1429
1430 let worker = Arc::new(SleepyWorker {
1431 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1433 exported: [FunctionMetadata {
1434 ffqn: FFQN_SOME,
1435 parameter_types: ParameterTypes::default(),
1436 return_type: None,
1437 extension: None,
1438 submittable: true,
1439 }],
1440 });
1441 let execution_id = ExecutionId::generate();
1443 let timeout_duration = Duration::from_millis(300);
1444 let db_connection = db_pool.connection();
1445 db_connection
1446 .create(CreateRequest {
1447 created_at: sim_clock.now(),
1448 execution_id: execution_id.clone(),
1449 ffqn: FFQN_SOME,
1450 params: Params::empty(),
1451 parent: None,
1452 metadata: concepts::ExecutionMetadata::empty(),
1453 scheduled_at: sim_clock.now(),
1454 retry_exp_backoff: timeout_duration,
1455 max_retries: 1,
1456 component_id: ComponentId::dummy_activity(),
1457 scheduled_by: None,
1458 })
1459 .await
1460 .unwrap();
1461
1462 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1463 let executor = ExecTask::new(
1464 worker,
1465 exec_config,
1466 sim_clock.clone(),
1467 db_pool.clone(),
1468 ffqns,
1469 );
1470 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1471 assert_eq!(1, first_execution_progress.executions.len());
1472 sim_clock.move_time_forward(lock_expiry).await;
1474 let now_after_first_lock_expiry = sim_clock.now();
1476 {
1477 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1478 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1479 assert!(cleanup_progress.executions.is_empty());
1480 }
1481 {
1482 let expired_locks =
1483 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1484 .await
1485 .unwrap()
1486 .expired_locks;
1487 assert_eq!(1, expired_locks);
1488 }
1489 assert!(
1490 !first_execution_progress
1491 .executions
1492 .pop()
1493 .unwrap()
1494 .1
1495 .is_finished()
1496 );
1497
1498 let execution_log = db_connection.get(&execution_id).await.unwrap();
1499 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1500 assert_matches!(
1501 &execution_log.events.get(2).unwrap(),
1502 ExecutionEvent {
1503 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1504 created_at: at,
1505 backtrace_id: None,
1506 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1507 );
1508 assert_eq!(
1509 PendingState::PendingAt {
1510 scheduled_at: expected_first_timeout_expiry
1511 },
1512 execution_log.pending_state
1513 );
1514 sim_clock.move_time_forward(timeout_duration).await;
1515 let now_after_first_timeout = sim_clock.now();
1516 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1517
1518 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1519 assert_eq!(1, second_execution_progress.executions.len());
1520
1521 sim_clock.move_time_forward(lock_expiry).await;
1523 let now_after_second_lock_expiry = sim_clock.now();
1525 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1526 {
1527 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1528 assert!(cleanup_progress.executions.is_empty());
1529 }
1530 {
1531 let expired_locks =
1532 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1533 .await
1534 .unwrap()
1535 .expired_locks;
1536 assert_eq!(1, expired_locks);
1537 }
1538 assert!(
1539 !second_execution_progress
1540 .executions
1541 .pop()
1542 .unwrap()
1543 .1
1544 .is_finished()
1545 );
1546
1547 drop(db_connection);
1548 drop(executor);
1549 db_pool.close().await.unwrap();
1550 }
1551}