1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7 LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13 FinishedExecutionError,
14 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30 pub lock_expiry: Duration,
31 pub tick_sleep: Duration,
32 pub batch_size: u32,
33 pub component_id: ComponentId,
34 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35 pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39 worker: Arc<dyn Worker>,
40 config: ExecConfig,
41 clock_fn: C, db_pool: Arc<dyn DbPool>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!("Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: Arc<dyn DbPool>,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 clock_fn,
123 db_pool,
124 ffqns,
125 }
126 }
127
128 pub fn spawn_new(
129 worker: Arc<dyn Worker>,
130 config: ExecConfig,
131 clock_fn: C,
132 db_pool: Arc<dyn DbPool>,
133 ) -> ExecutorTaskHandle {
134 let is_closing = Arc::new(AtomicBool::default());
135 let is_closing_inner = is_closing.clone();
136 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137 let component_id = config.component_id.clone();
138 let executor_id = config.executor_id;
139 let abort_handle = tokio::spawn(async move {
140 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141 let task = Self {
142 worker,
143 config,
144 db_pool,
145 ffqns: ffqns.clone(),
146 clock_fn: clock_fn.clone(),
147 };
148 loop {
149 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150 let executed_at = clock_fn.now();
151 task.db_pool
152 .connection()
153 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154 .await;
155 if is_closing_inner.load(Ordering::Relaxed) {
156 return;
157 }
158 }
159 })
160 .abort_handle();
161 ExecutorTaskHandle {
162 is_closing,
163 abort_handle,
164 component_id,
165 executor_id,
166 }
167 }
168
169 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170 if let Some(task_limiter) = &self.config.task_limiter {
171 let mut locks = Vec::new();
172 for _ in 0..self.config.batch_size {
173 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174 locks.push(Some(permit));
175 } else {
176 break;
177 }
178 }
179 locks
180 } else {
181 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182 for _ in 0..self.config.batch_size {
183 vec.push(None);
184 }
185 vec
186 }
187 }
188
189 #[cfg(feature = "test")]
190 pub async fn tick_test(
191 &self,
192 executed_at: DateTime<Utc>,
193 run_id: RunId,
194 ) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at, run_id).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199 async fn tick(
200 &self,
201 executed_at: DateTime<Utc>,
202 run_id: RunId,
203 ) -> Result<ExecutionProgress, ()> {
204 let locked_executions = {
205 let mut permits = self.acquire_task_permits();
206 if permits.is_empty() {
207 return Ok(ExecutionProgress::default());
208 }
209 let db_connection = self.db_pool.connection();
210 let lock_expires_at = executed_at + self.config.lock_expiry;
211 let locked_executions = db_connection
212 .lock_pending(
213 permits.len(), executed_at, self.ffqns.clone(),
216 executed_at, self.config.component_id.clone(),
218 self.config.executor_id,
219 lock_expires_at,
220 run_id,
221 )
222 .await
223 .map_err(|err| {
224 warn!("lock_pending error {err:?}");
225 })?;
226 while permits.len() > locked_executions.len() {
228 permits.pop();
229 }
230 assert_eq!(permits.len(), locked_executions.len());
231 locked_executions.into_iter().zip(permits)
232 };
233 let execution_deadline = executed_at + self.config.lock_expiry;
234
235 let mut executions = Vec::with_capacity(locked_executions.len());
236 for (locked_execution, permit) in locked_executions {
237 let execution_id = locked_execution.execution_id.clone();
238 let join_handle = {
239 let worker = self.worker.clone();
240 let db_pool = self.db_pool.clone();
241 let clock_fn = self.clock_fn.clone();
242 let run_id = locked_execution.run_id;
243 let worker_span = info_span!(parent: None, "worker",
244 "otel.name" = format!("worker {}", locked_execution.ffqn),
245 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246 locked_execution.metadata.enrich(&worker_span);
247 tokio::spawn({
248 let worker_span2 = worker_span.clone();
249 async move {
250 let _permit = permit;
251 let res = Self::run_worker(
252 worker,
253 db_pool.as_ref(),
254 execution_deadline,
255 clock_fn,
256 locked_execution,
257 worker_span2,
258 )
259 .await;
260 if let Err(db_error) = res {
261 error!("Got DbError `{db_error:?}`, expecting watcher to mark execution as timed out");
262 }
263 }
264 .instrument(worker_span)
265 })
266 };
267 executions.push((execution_id, join_handle));
268 }
269 Ok(ExecutionProgress { executions })
270 }
271
272 async fn run_worker(
273 worker: Arc<dyn Worker>,
274 db_pool: &dyn DbPool,
275 execution_deadline: DateTime<Utc>,
276 clock_fn: C,
277 locked_execution: LockedExecution,
278 worker_span: Span,
279 ) -> Result<(), DbError> {
280 debug!("Worker::run starting");
281 trace!(
282 version = %locked_execution.version,
283 params = ?locked_execution.params,
284 event_history = ?locked_execution.event_history,
285 "Worker::run starting"
286 );
287 let can_be_retried = ExecutionLog::can_be_retried_after(
288 locked_execution.temporary_event_count + 1,
289 locked_execution.max_retries,
290 locked_execution.retry_exp_backoff,
291 );
292 let unlock_expiry_on_limit_reached =
293 ExecutionLog::compute_retry_duration_when_retrying_forever(
294 locked_execution.temporary_event_count + 1,
295 locked_execution.retry_exp_backoff,
296 );
297 let ctx = WorkerContext {
298 execution_id: locked_execution.execution_id.clone(),
299 metadata: locked_execution.metadata,
300 ffqn: locked_execution.ffqn,
301 params: locked_execution.params,
302 event_history: locked_execution.event_history,
303 responses: locked_execution
304 .responses
305 .into_iter()
306 .map(|outer| outer.event)
307 .collect(),
308 version: locked_execution.version,
309 execution_deadline,
310 can_be_retried: can_be_retried.is_some(),
311 run_id: locked_execution.run_id,
312 worker_span,
313 };
314 let worker_result = worker.run(ctx).await;
315 trace!(?worker_result, "Worker::run finished");
316 let result_obtained_at = clock_fn.now();
317 match Self::worker_result_to_execution_event(
318 locked_execution.execution_id,
319 worker_result,
320 result_obtained_at,
321 locked_execution.parent,
322 can_be_retried,
323 unlock_expiry_on_limit_reached,
324 )? {
325 Some(append) => {
326 let db_connection = db_pool.connection();
327 trace!("Appending {append:?}");
328 append.append(db_connection.as_ref()).await
329 }
330 None => Ok(()),
331 }
332 }
333
334 fn worker_result_to_execution_event(
337 execution_id: ExecutionId,
338 worker_result: WorkerResult,
339 result_obtained_at: DateTime<Utc>,
340 parent: Option<(ExecutionId, JoinSetId)>,
341 can_be_retried: Option<Duration>,
342 unlock_expiry_on_limit_reached: Duration,
343 ) -> Result<Option<Append>, DbError> {
344 Ok(match worker_result {
345 WorkerResult::Ok(result, new_version, http_client_traces) => {
346 info!(
347 "Execution finished: {}",
348 result.as_pending_state_finished_result()
349 );
350 let child_finished =
351 parent.map(
352 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
353 parent_execution_id,
354 parent_join_set,
355 result: Ok(result.clone()),
356 },
357 );
358 let primary_event = AppendRequest {
359 created_at: result_obtained_at,
360 event: ExecutionEventInner::Finished {
361 result: Ok(result),
362 http_client_traces,
363 },
364 };
365
366 Some(Append {
367 created_at: result_obtained_at,
368 primary_event,
369 execution_id,
370 version: new_version,
371 child_finished,
372 })
373 }
374 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
375 WorkerResult::Err(err) => {
376 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
378 WorkerError::TemporaryTimeout {
379 http_client_traces,
380 version,
381 } => {
382 if let Some(duration) = can_be_retried {
383 let backoff_expires_at = result_obtained_at + duration;
384 info!(
385 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
386 );
387 (
388 ExecutionEventInner::TemporarilyTimedOut {
389 backoff_expires_at,
390 http_client_traces,
391 },
392 None,
393 version,
394 )
395 } else {
396 info!("Permanent timeout");
397 let result = Err(FinishedExecutionError::PermanentTimeout);
398 let child_finished =
399 parent.map(|(parent_execution_id, parent_join_set)| {
400 ChildFinishedResponse {
401 parent_execution_id,
402 parent_join_set,
403 result: result.clone(),
404 }
405 });
406 (
407 ExecutionEventInner::Finished {
408 result,
409 http_client_traces,
410 },
411 child_finished,
412 version,
413 )
414 }
415 }
416 WorkerError::DbError(db_error) => {
417 return Err(db_error);
418 }
419 WorkerError::ActivityTrap {
420 reason: reason_inner,
421 trap_kind,
422 detail,
423 version,
424 http_client_traces,
425 } => retry_or_fail(
426 result_obtained_at,
427 parent,
428 can_be_retried,
429 reason_full,
430 reason_inner,
431 trap_kind, detail,
433 version,
434 http_client_traces,
435 ),
436 WorkerError::ActivityPreopenedDirError {
437 reason_kind,
438 reason_inner,
439 version,
440 } => retry_or_fail(
441 result_obtained_at,
442 parent,
443 can_be_retried,
444 reason_full,
445 reason_inner,
446 reason_kind, None, version,
449 None, ),
451 WorkerError::ActivityReturnedError {
452 detail,
453 version,
454 http_client_traces,
455 } => {
456 let duration = can_be_retried.expect(
457 "ActivityReturnedError must not be returned when retries are exhausted",
458 );
459 let expires_at = result_obtained_at + duration;
460 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
461 (
462 ExecutionEventInner::TemporarilyFailed {
463 backoff_expires_at: expires_at,
464 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
466 detail, http_client_traces,
468 },
469 None,
470 version,
471 )
472 }
473 WorkerError::LimitReached {
474 reason: inner_reason,
475 version: new_version,
476 } => {
477 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
478 warn!(
479 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
480 );
481 (
482 ExecutionEventInner::Unlocked {
483 backoff_expires_at: expires_at,
484 reason: StrVariant::from(reason_full),
485 },
486 None,
487 new_version,
488 )
489 }
490 WorkerError::FatalError(fatal_error, version) => {
491 info!("Fatal worker error - {fatal_error:?}");
492 let result = Err(FinishedExecutionError::from(fatal_error));
493 let child_finished =
494 parent.map(|(parent_execution_id, parent_join_set)| {
495 ChildFinishedResponse {
496 parent_execution_id,
497 parent_join_set,
498 result: result.clone(),
499 }
500 });
501 (
502 ExecutionEventInner::Finished {
503 result,
504 http_client_traces: None,
505 },
506 child_finished,
507 version,
508 )
509 }
510 };
511 Some(Append {
512 created_at: result_obtained_at,
513 primary_event: AppendRequest {
514 created_at: result_obtained_at,
515 event: primary_event,
516 },
517 execution_id,
518 version,
519 child_finished,
520 })
521 }
522 })
523 }
524}
525
526#[expect(clippy::too_many_arguments)]
527fn retry_or_fail(
528 result_obtained_at: DateTime<Utc>,
529 parent: Option<(ExecutionId, JoinSetId)>,
530 can_be_retried: Option<Duration>,
531 reason_full: String,
532 reason_inner: String,
533 err_kind_for_logs: impl Display,
534 detail: Option<String>,
535 version: Version,
536 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
537) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
538 if let Some(duration) = can_be_retried {
539 let expires_at = result_obtained_at + duration;
540 debug!(
541 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
542 );
543 (
544 ExecutionEventInner::TemporarilyFailed {
545 backoff_expires_at: expires_at,
546 reason_full: StrVariant::from(reason_full),
547 reason_inner: StrVariant::from(reason_inner),
548 detail,
549 http_client_traces,
550 },
551 None,
552 version,
553 )
554 } else {
555 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
556 let result = Err(FinishedExecutionError::PermanentFailure {
557 reason_inner,
558 reason_full,
559 kind: PermanentFailureKind::ActivityTrap,
560 detail,
561 });
562 let child_finished =
563 parent.map(
564 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
565 parent_execution_id,
566 parent_join_set,
567 result: result.clone(),
568 },
569 );
570 (
571 ExecutionEventInner::Finished {
572 result,
573 http_client_traces,
574 },
575 child_finished,
576 version,
577 )
578 }
579}
580
581#[derive(Debug, Clone)]
582pub(crate) struct ChildFinishedResponse {
583 pub(crate) parent_execution_id: ExecutionId,
584 pub(crate) parent_join_set: JoinSetId,
585 pub(crate) result: FinishedExecutionResult,
586}
587
588#[derive(Debug, Clone)]
589pub(crate) struct Append {
590 pub(crate) created_at: DateTime<Utc>,
591 pub(crate) primary_event: AppendRequest,
592 pub(crate) execution_id: ExecutionId,
593 pub(crate) version: Version,
594 pub(crate) child_finished: Option<ChildFinishedResponse>,
595}
596
597impl Append {
598 pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
599 if let Some(child_finished) = self.child_finished {
600 assert_matches!(
601 &self.primary_event,
602 AppendRequest {
603 event: ExecutionEventInner::Finished { .. },
604 ..
605 }
606 );
607 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
608 db_connection
609 .append_batch_respond_to_parent(
610 derived.clone(),
611 self.created_at,
612 vec![self.primary_event],
613 self.version.clone(),
614 child_finished.parent_execution_id,
615 JoinSetResponseEventOuter {
616 created_at: self.created_at,
617 event: JoinSetResponseEvent {
618 join_set_id: child_finished.parent_join_set,
619 event: JoinSetResponse::ChildExecutionFinished {
620 child_execution_id: derived,
621 finished_version: self.version,
623 result: child_finished.result,
624 },
625 },
626 },
627 )
628 .await?;
629 } else {
630 db_connection
631 .append_batch(
632 self.created_at,
633 vec![self.primary_event],
634 self.execution_id,
635 self.version,
636 )
637 .await?;
638 }
639 Ok(())
640 }
641}
642
643#[cfg(any(test, feature = "test"))]
644pub mod simple_worker {
645 use crate::worker::{Worker, WorkerContext, WorkerResult};
646 use async_trait::async_trait;
647 use concepts::{
648 FunctionFqn, FunctionMetadata, ParameterTypes,
649 storage::{HistoryEvent, Version},
650 };
651 use indexmap::IndexMap;
652 use std::sync::Arc;
653 use tracing::trace;
654
655 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
656 pub type SimpleWorkerResultMap =
657 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
658
659 #[derive(Clone, Debug)]
660 pub struct SimpleWorker {
661 pub worker_results_rev: SimpleWorkerResultMap,
662 pub ffqn: FunctionFqn,
663 exported: [FunctionMetadata; 1],
664 }
665
666 impl SimpleWorker {
667 #[must_use]
668 pub fn with_single_result(res: WorkerResult) -> Self {
669 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
670 Version::new(2),
671 (vec![], res),
672 )]))))
673 }
674
675 #[must_use]
676 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
677 Self {
678 worker_results_rev: self.worker_results_rev,
679 exported: [FunctionMetadata {
680 ffqn: ffqn.clone(),
681 parameter_types: ParameterTypes::default(),
682 return_type: None,
683 extension: None,
684 submittable: true,
685 }],
686 ffqn,
687 }
688 }
689
690 #[must_use]
691 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
692 Self {
693 worker_results_rev,
694 ffqn: FFQN_SOME,
695 exported: [FunctionMetadata {
696 ffqn: FFQN_SOME,
697 parameter_types: ParameterTypes::default(),
698 return_type: None,
699 extension: None,
700 submittable: true,
701 }],
702 }
703 }
704 }
705
706 #[async_trait]
707 impl Worker for SimpleWorker {
708 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
709 let (expected_version, (expected_eh, worker_result)) =
710 self.worker_results_rev.lock().unwrap().pop().unwrap();
711 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
712 assert_eq!(expected_version, ctx.version);
713 assert_eq!(expected_eh, ctx.event_history);
714 worker_result
715 }
716
717 fn exported_functions(&self) -> &[FunctionMetadata] {
718 &self.exported
719 }
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use self::simple_worker::SimpleWorker;
726 use super::*;
727 use crate::worker::FatalError;
728 use crate::{expired_timers_watcher, worker::WorkerResult};
729 use assert_matches::assert_matches;
730 use async_trait::async_trait;
731 use concepts::storage::{CreateRequest, JoinSetRequest};
732 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
733 use concepts::time::Now;
734 use concepts::{
735 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
736 SupportedFunctionReturnValue, TrapKind,
737 };
738 use db_tests::Database;
739 use indexmap::IndexMap;
740 use simple_worker::FFQN_SOME;
741 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
742 use test_utils::set_up;
743 use test_utils::sim_clock::{ConstClock, SimClock};
744
745 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
746
747 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
748 config: ExecConfig,
749 clock_fn: C,
750 db_pool: Arc<dyn DbPool>,
751 worker: Arc<W>,
752 executed_at: DateTime<Utc>,
753 ) -> ExecutionProgress {
754 trace!("Ticking with {worker:?}");
755 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
756 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
757 let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
758 loop {
759 execution_progress
760 .executions
761 .retain(|(_, abort_handle)| !abort_handle.is_finished());
762 if execution_progress.executions.is_empty() {
763 return execution_progress;
764 }
765 tokio::time::sleep(Duration::from_millis(10)).await;
766 }
767 }
768
769 #[tokio::test]
770 async fn execute_simple_lifecycle_tick_based_mem() {
771 let created_at = Now.now();
772 let (_guard, db_pool) = Database::Memory.set_up().await;
773 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
774 db_pool.close().await.unwrap();
775 }
776
777 #[tokio::test]
778 async fn execute_simple_lifecycle_tick_based_sqlite() {
779 let created_at = Now.now();
780 let (_guard, db_pool) = Database::Sqlite.set_up().await;
781 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
782 db_pool.close().await.unwrap();
783 }
784
785 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
786 pool: Arc<dyn DbPool>,
787 clock_fn: C,
788 ) {
789 set_up();
790 let created_at = clock_fn.now();
791 let exec_config = ExecConfig {
792 batch_size: 1,
793 lock_expiry: Duration::from_secs(1),
794 tick_sleep: Duration::from_millis(100),
795 component_id: ComponentId::dummy_activity(),
796 task_limiter: None,
797 executor_id: ExecutorId::generate(),
798 };
799
800 let execution_log = create_and_tick(
801 CreateAndTickConfig {
802 execution_id: ExecutionId::generate(),
803 created_at,
804 max_retries: 0,
805 executed_at: created_at,
806 retry_exp_backoff: Duration::ZERO,
807 },
808 clock_fn,
809 pool,
810 exec_config,
811 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
812 SupportedFunctionReturnValue::None,
813 Version::new(2),
814 None,
815 ))),
816 tick_fn,
817 )
818 .await;
819 assert_matches!(
820 execution_log.events.get(2).unwrap(),
821 ExecutionEvent {
822 event: ExecutionEventInner::Finished {
823 result: Ok(SupportedFunctionReturnValue::None),
824 http_client_traces: None
825 },
826 created_at: _,
827 backtrace_id: None,
828 }
829 );
830 }
831
832 #[tokio::test]
833 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
834 set_up();
835 let created_at = Now.now();
836 let clock_fn = ConstClock(created_at);
837 let (_guard, db_pool) = Database::Memory.set_up().await;
838 let exec_config = ExecConfig {
839 batch_size: 1,
840 lock_expiry: Duration::from_secs(1),
841 tick_sleep: Duration::ZERO,
842 component_id: ComponentId::dummy_activity(),
843 task_limiter: None,
844 executor_id: ExecutorId::generate(),
845 };
846
847 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
848 SupportedFunctionReturnValue::None,
849 Version::new(2),
850 None,
851 )));
852 let exec_task = ExecTask::spawn_new(
853 worker.clone(),
854 exec_config.clone(),
855 clock_fn,
856 db_pool.clone(),
857 );
858
859 let execution_log = create_and_tick(
860 CreateAndTickConfig {
861 execution_id: ExecutionId::generate(),
862 created_at,
863 max_retries: 0,
864 executed_at: created_at,
865 retry_exp_backoff: Duration::ZERO,
866 },
867 clock_fn,
868 db_pool.clone(),
869 exec_config,
870 worker,
871 |_, _, _, _, _| async {
872 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
874 },
875 )
876 .await;
877 exec_task.close().await;
878 db_pool.close().await.unwrap();
879 assert_matches!(
880 execution_log.events.get(2).unwrap(),
881 ExecutionEvent {
882 event: ExecutionEventInner::Finished {
883 result: Ok(SupportedFunctionReturnValue::None),
884 http_client_traces: None
885 },
886 created_at: _,
887 backtrace_id: None,
888 }
889 );
890 }
891
892 struct CreateAndTickConfig {
893 execution_id: ExecutionId,
894 created_at: DateTime<Utc>,
895 max_retries: u32,
896 executed_at: DateTime<Utc>,
897 retry_exp_backoff: Duration,
898 }
899
900 async fn create_and_tick<
901 W: Worker,
902 C: ClockFn,
903 T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
904 F: Future<Output = ExecutionProgress>,
905 >(
906 config: CreateAndTickConfig,
907 clock_fn: C,
908 db_pool: Arc<dyn DbPool>,
909 exec_config: ExecConfig,
910 worker: Arc<W>,
911 mut tick: T,
912 ) -> ExecutionLog {
913 let db_connection = db_pool.connection();
915 db_connection
916 .create(CreateRequest {
917 created_at: config.created_at,
918 execution_id: config.execution_id.clone(),
919 ffqn: FFQN_SOME,
920 params: Params::empty(),
921 parent: None,
922 metadata: concepts::ExecutionMetadata::empty(),
923 scheduled_at: config.created_at,
924 retry_exp_backoff: config.retry_exp_backoff,
925 max_retries: config.max_retries,
926 component_id: ComponentId::dummy_activity(),
927 scheduled_by: None,
928 })
929 .await
930 .unwrap();
931 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
933 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
934 debug!("Execution history after tick: {execution_log:?}");
935 assert_matches!(
937 execution_log.events.first().unwrap(),
938 ExecutionEvent {
939 event: ExecutionEventInner::Created { .. },
940 created_at: actually_created_at,
941 backtrace_id: None,
942 }
943 if config.created_at == *actually_created_at
944 );
945 let locked_at = assert_matches!(
946 execution_log.events.get(1).unwrap(),
947 ExecutionEvent {
948 event: ExecutionEventInner::Locked { .. },
949 created_at: locked_at,
950 backtrace_id: None,
951 } if config.created_at <= *locked_at
952 => *locked_at
953 );
954 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
955 event: _,
956 created_at: executed_at,
957 backtrace_id: None,
958 } if *executed_at >= locked_at);
959 execution_log
960 }
961
962 #[tokio::test]
963 async fn activity_trap_should_trigger_an_execution_retry() {
964 set_up();
965 let sim_clock = SimClock::default();
966 let (_guard, db_pool) = Database::Memory.set_up().await;
967 let exec_config = ExecConfig {
968 batch_size: 1,
969 lock_expiry: Duration::from_secs(1),
970 tick_sleep: Duration::ZERO,
971 component_id: ComponentId::dummy_activity(),
972 task_limiter: None,
973 executor_id: ExecutorId::generate(),
974 };
975 let expected_reason = "error reason";
976 let expected_detail = "error detail";
977 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
978 WorkerError::ActivityTrap {
979 reason: expected_reason.to_string(),
980 trap_kind: concepts::TrapKind::Trap,
981 detail: Some(expected_detail.to_string()),
982 version: Version::new(2),
983 http_client_traces: None,
984 },
985 )));
986 let retry_exp_backoff = Duration::from_millis(100);
987 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
988 let execution_log = create_and_tick(
989 CreateAndTickConfig {
990 execution_id: ExecutionId::generate(),
991 created_at: sim_clock.now(),
992 max_retries: 1,
993 executed_at: sim_clock.now(),
994 retry_exp_backoff,
995 },
996 sim_clock.clone(),
997 db_pool.clone(),
998 exec_config.clone(),
999 worker,
1000 tick_fn,
1001 )
1002 .await;
1003 assert_eq!(3, execution_log.events.len());
1004 {
1005 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1006 &execution_log.events.get(2).unwrap(),
1007 ExecutionEvent {
1008 event: ExecutionEventInner::TemporarilyFailed {
1009 reason_inner,
1010 reason_full,
1011 detail,
1012 backoff_expires_at,
1013 http_client_traces: None,
1014 },
1015 created_at: at,
1016 backtrace_id: None,
1017 }
1018 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1019 );
1020 assert_eq!(expected_reason, reason_inner.deref());
1021 assert_eq!(
1022 format!("activity trap: {expected_reason}"),
1023 reason_full.deref()
1024 );
1025 assert_eq!(Some(expected_detail), detail.as_deref());
1026 assert_eq!(at, sim_clock.now());
1027 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1028 }
1029 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1030 std::sync::Mutex::new(IndexMap::from([(
1031 Version::new(4),
1032 (
1033 vec![],
1034 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1035 ),
1036 )])),
1037 )));
1038 assert!(
1040 tick_fn(
1041 exec_config.clone(),
1042 sim_clock.clone(),
1043 db_pool.clone(),
1044 worker.clone(),
1045 sim_clock.now(),
1046 )
1047 .await
1048 .executions
1049 .is_empty()
1050 );
1051 sim_clock.move_time_forward(retry_exp_backoff);
1053 tick_fn(
1054 exec_config,
1055 sim_clock.clone(),
1056 db_pool.clone(),
1057 worker,
1058 sim_clock.now(),
1059 )
1060 .await;
1061 let execution_log = {
1062 let db_connection = db_pool.connection();
1063 db_connection
1064 .get(&execution_log.execution_id)
1065 .await
1066 .unwrap()
1067 };
1068 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1069 assert_matches!(
1070 execution_log.events.get(3).unwrap(),
1071 ExecutionEvent {
1072 event: ExecutionEventInner::Locked { .. },
1073 created_at: at,
1074 backtrace_id: None,
1075 } if *at == sim_clock.now()
1076 );
1077 assert_matches!(
1078 execution_log.events.get(4).unwrap(),
1079 ExecutionEvent {
1080 event: ExecutionEventInner::Finished {
1081 result: Ok(SupportedFunctionReturnValue::None),
1082 http_client_traces: None
1083 },
1084 created_at: finished_at,
1085 backtrace_id: None,
1086 } if *finished_at == sim_clock.now()
1087 );
1088 db_pool.close().await.unwrap();
1089 }
1090
1091 #[tokio::test]
1092 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1093 set_up();
1094 let created_at = Now.now();
1095 let clock_fn = ConstClock(created_at);
1096 let (_guard, db_pool) = Database::Memory.set_up().await;
1097 let exec_config = ExecConfig {
1098 batch_size: 1,
1099 lock_expiry: Duration::from_secs(1),
1100 tick_sleep: Duration::ZERO,
1101 component_id: ComponentId::dummy_activity(),
1102 task_limiter: None,
1103 executor_id: ExecutorId::generate(),
1104 };
1105
1106 let expected_reason = "error reason";
1107 let expected_detail = "error detail";
1108 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1109 WorkerError::ActivityTrap {
1110 reason: expected_reason.to_string(),
1111 trap_kind: concepts::TrapKind::Trap,
1112 detail: Some(expected_detail.to_string()),
1113 version: Version::new(2),
1114 http_client_traces: None,
1115 },
1116 )));
1117 let execution_log = create_and_tick(
1118 CreateAndTickConfig {
1119 execution_id: ExecutionId::generate(),
1120 created_at,
1121 max_retries: 0,
1122 executed_at: created_at,
1123 retry_exp_backoff: Duration::ZERO,
1124 },
1125 clock_fn,
1126 db_pool.clone(),
1127 exec_config.clone(),
1128 worker,
1129 tick_fn,
1130 )
1131 .await;
1132 assert_eq!(3, execution_log.events.len());
1133 let (reason, kind, detail) = assert_matches!(
1134 &execution_log.events.get(2).unwrap(),
1135 ExecutionEvent {
1136 event: ExecutionEventInner::Finished{
1137 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1138 http_client_traces: None
1139 },
1140 created_at: at,
1141 backtrace_id: None,
1142 } if *at == created_at
1143 => (reason_inner, kind, detail)
1144 );
1145 assert_eq!(expected_reason, *reason);
1146 assert_eq!(Some(expected_detail), detail.as_deref());
1147 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1148
1149 db_pool.close().await.unwrap();
1150 }
1151
1152 #[tokio::test]
1153 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1154 let worker_error = WorkerError::ActivityTrap {
1155 reason: "error reason".to_string(),
1156 trap_kind: TrapKind::Trap,
1157 detail: Some("detail".to_string()),
1158 version: Version::new(2),
1159 http_client_traces: None,
1160 };
1161 let expected_child_err = FinishedExecutionError::PermanentFailure {
1162 reason_full: "activity trap: error reason".to_string(),
1163 reason_inner: "error reason".to_string(),
1164 kind: PermanentFailureKind::ActivityTrap,
1165 detail: Some("detail".to_string()),
1166 };
1167 child_execution_permanently_failed_should_notify_parent(
1168 WorkerResult::Err(worker_error),
1169 expected_child_err,
1170 )
1171 .await;
1172 }
1173
1174 #[tokio::test]
1177 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1178 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1179 child_execution_permanently_failed_should_notify_parent(
1180 WorkerResult::DbUpdatedByWorkerOrWatcher,
1181 expected_child_err,
1182 )
1183 .await;
1184 }
1185
1186 #[tokio::test]
1187 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1188 let parent_id = ExecutionId::from_parts(1, 1);
1189 let join_set_id_outer =
1190 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1191 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1192 let join_set_id_inner =
1193 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1194 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1195 let worker_error = WorkerError::FatalError(
1196 FatalError::UnhandledChildExecutionError {
1197 child_execution_id: child_execution_id.clone(),
1198 root_cause_id: root_cause_id.clone(),
1199 },
1200 Version::new(2),
1201 );
1202 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1203 child_execution_id,
1204 root_cause_id,
1205 };
1206 child_execution_permanently_failed_should_notify_parent(
1207 WorkerResult::Err(worker_error),
1208 expected_child_err,
1209 )
1210 .await;
1211 }
1212
1213 async fn child_execution_permanently_failed_should_notify_parent(
1214 worker_result: WorkerResult,
1215 expected_child_err: FinishedExecutionError,
1216 ) {
1217 use concepts::storage::JoinSetResponseEventOuter;
1218 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1219
1220 set_up();
1221 let sim_clock = SimClock::default();
1222 let (_guard, db_pool) = Database::Memory.set_up().await;
1223
1224 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1225 WorkerResult::DbUpdatedByWorkerOrWatcher,
1226 ));
1227 let parent_execution_id = ExecutionId::generate();
1228 db_pool
1229 .connection()
1230 .create(CreateRequest {
1231 created_at: sim_clock.now(),
1232 execution_id: parent_execution_id.clone(),
1233 ffqn: FFQN_SOME,
1234 params: Params::empty(),
1235 parent: None,
1236 metadata: concepts::ExecutionMetadata::empty(),
1237 scheduled_at: sim_clock.now(),
1238 retry_exp_backoff: Duration::ZERO,
1239 max_retries: 0,
1240 component_id: ComponentId::dummy_activity(),
1241 scheduled_by: None,
1242 })
1243 .await
1244 .unwrap();
1245 tick_fn(
1246 ExecConfig {
1247 batch_size: 1,
1248 lock_expiry: LOCK_EXPIRY,
1249 tick_sleep: Duration::ZERO,
1250 component_id: ComponentId::dummy_activity(),
1251 task_limiter: None,
1252 executor_id: ExecutorId::generate(),
1253 },
1254 sim_clock.clone(),
1255 db_pool.clone(),
1256 parent_worker,
1257 sim_clock.now(),
1258 )
1259 .await;
1260
1261 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1262 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1263 {
1265 let child = CreateRequest {
1266 created_at: sim_clock.now(),
1267 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1268 ffqn: FFQN_CHILD,
1269 params: Params::empty(),
1270 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1271 metadata: concepts::ExecutionMetadata::empty(),
1272 scheduled_at: sim_clock.now(),
1273 retry_exp_backoff: Duration::ZERO,
1274 max_retries: 0,
1275 component_id: ComponentId::dummy_activity(),
1276 scheduled_by: None,
1277 };
1278 let current_time = sim_clock.now();
1279 let join_set = AppendRequest {
1280 created_at: current_time,
1281 event: ExecutionEventInner::HistoryEvent {
1282 event: HistoryEvent::JoinSetCreate {
1283 join_set_id: join_set_id.clone(),
1284 closing_strategy: ClosingStrategy::Complete,
1285 },
1286 },
1287 };
1288 let child_exec_req = AppendRequest {
1289 created_at: current_time,
1290 event: ExecutionEventInner::HistoryEvent {
1291 event: HistoryEvent::JoinSetRequest {
1292 join_set_id: join_set_id.clone(),
1293 request: JoinSetRequest::ChildExecutionRequest {
1294 child_execution_id: child_execution_id.clone(),
1295 },
1296 },
1297 },
1298 };
1299 let join_next = AppendRequest {
1300 created_at: current_time,
1301 event: ExecutionEventInner::HistoryEvent {
1302 event: HistoryEvent::JoinNext {
1303 join_set_id: join_set_id.clone(),
1304 run_expires_at: sim_clock.now(),
1305 closing: false,
1306 requested_ffqn: Some(FFQN_CHILD),
1307 },
1308 },
1309 };
1310 db_pool
1311 .connection()
1312 .append_batch_create_new_execution(
1313 current_time,
1314 vec![join_set, child_exec_req, join_next],
1315 parent_execution_id.clone(),
1316 Version::new(2),
1317 vec![child],
1318 )
1319 .await
1320 .unwrap();
1321 }
1322
1323 let child_worker =
1324 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1325
1326 tick_fn(
1328 ExecConfig {
1329 batch_size: 1,
1330 lock_expiry: LOCK_EXPIRY,
1331 tick_sleep: Duration::ZERO,
1332 component_id: ComponentId::dummy_activity(),
1333 task_limiter: None,
1334 executor_id: ExecutorId::generate(),
1335 },
1336 sim_clock.clone(),
1337 db_pool.clone(),
1338 child_worker,
1339 sim_clock.now(),
1340 )
1341 .await;
1342 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1343 sim_clock.move_time_forward(LOCK_EXPIRY);
1345 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1346 .await
1347 .unwrap();
1348 }
1349 let child_log = db_pool
1350 .connection()
1351 .get(&ExecutionId::Derived(child_execution_id.clone()))
1352 .await
1353 .unwrap();
1354 assert!(child_log.pending_state.is_finished());
1355 assert_eq!(
1356 Version(2),
1357 child_log.next_version,
1358 "created = 0, locked = 1, with_single_result = 2"
1359 );
1360 assert_eq!(
1361 ExecutionEventInner::Finished {
1362 result: Err(expected_child_err),
1363 http_client_traces: None
1364 },
1365 child_log.last_event().event
1366 );
1367 let parent_log = db_pool
1368 .connection()
1369 .get(&parent_execution_id)
1370 .await
1371 .unwrap();
1372 assert_matches!(
1373 parent_log.pending_state,
1374 PendingState::PendingAt {
1375 scheduled_at
1376 } if scheduled_at == sim_clock.now(),
1377 "parent should be back to pending"
1378 );
1379 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1380 parent_log.responses.last(),
1381 Some(JoinSetResponseEventOuter{
1382 created_at: at,
1383 event: JoinSetResponseEvent{
1384 join_set_id: found_join_set_id,
1385 event: JoinSetResponse::ChildExecutionFinished {
1386 child_execution_id: found_child_execution_id,
1387 finished_version,
1388 result: found_result,
1389 }
1390 }
1391 })
1392 if *at == sim_clock.now()
1393 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1394 );
1395 assert_eq!(join_set_id, *found_join_set_id);
1396 assert_eq!(child_execution_id, *found_child_execution_id);
1397 assert_eq!(child_log.next_version, *child_finished_version);
1398 assert!(found_result.is_err());
1399
1400 db_pool.close().await.unwrap();
1401 }
1402
1403 #[derive(Clone, Debug)]
1404 struct SleepyWorker {
1405 duration: Duration,
1406 result: SupportedFunctionReturnValue,
1407 exported: [FunctionMetadata; 1],
1408 }
1409
1410 #[async_trait]
1411 impl Worker for SleepyWorker {
1412 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1413 tokio::time::sleep(self.duration).await;
1414 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1415 }
1416
1417 fn exported_functions(&self) -> &[FunctionMetadata] {
1418 &self.exported
1419 }
1420 }
1421
1422 #[tokio::test]
1423 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1424 set_up();
1425 let sim_clock = SimClock::default();
1426 let (_guard, db_pool) = Database::Memory.set_up().await;
1427 let lock_expiry = Duration::from_millis(100);
1428 let exec_config = ExecConfig {
1429 batch_size: 1,
1430 lock_expiry,
1431 tick_sleep: Duration::ZERO,
1432 component_id: ComponentId::dummy_activity(),
1433 task_limiter: None,
1434 executor_id: ExecutorId::generate(),
1435 };
1436
1437 let worker = Arc::new(SleepyWorker {
1438 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1440 exported: [FunctionMetadata {
1441 ffqn: FFQN_SOME,
1442 parameter_types: ParameterTypes::default(),
1443 return_type: None,
1444 extension: None,
1445 submittable: true,
1446 }],
1447 });
1448 let execution_id = ExecutionId::generate();
1450 let timeout_duration = Duration::from_millis(300);
1451 let db_connection = db_pool.connection();
1452 db_connection
1453 .create(CreateRequest {
1454 created_at: sim_clock.now(),
1455 execution_id: execution_id.clone(),
1456 ffqn: FFQN_SOME,
1457 params: Params::empty(),
1458 parent: None,
1459 metadata: concepts::ExecutionMetadata::empty(),
1460 scheduled_at: sim_clock.now(),
1461 retry_exp_backoff: timeout_duration,
1462 max_retries: 1,
1463 component_id: ComponentId::dummy_activity(),
1464 scheduled_by: None,
1465 })
1466 .await
1467 .unwrap();
1468
1469 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1470 let executor = ExecTask::new(
1471 worker,
1472 exec_config,
1473 sim_clock.clone(),
1474 db_pool.clone(),
1475 ffqns,
1476 );
1477 let mut first_execution_progress = executor
1478 .tick(sim_clock.now(), RunId::generate())
1479 .await
1480 .unwrap();
1481 assert_eq!(1, first_execution_progress.executions.len());
1482 sim_clock.move_time_forward(lock_expiry);
1484 let now_after_first_lock_expiry = sim_clock.now();
1486 {
1487 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1488 let cleanup_progress = executor
1489 .tick(now_after_first_lock_expiry, RunId::generate())
1490 .await
1491 .unwrap();
1492 assert!(cleanup_progress.executions.is_empty());
1493 }
1494 {
1495 let expired_locks = expired_timers_watcher::tick(
1496 db_pool.connection().as_ref(),
1497 now_after_first_lock_expiry,
1498 )
1499 .await
1500 .unwrap()
1501 .expired_locks;
1502 assert_eq!(1, expired_locks);
1503 }
1504 assert!(
1505 !first_execution_progress
1506 .executions
1507 .pop()
1508 .unwrap()
1509 .1
1510 .is_finished()
1511 );
1512
1513 let execution_log = db_connection.get(&execution_id).await.unwrap();
1514 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1515 assert_matches!(
1516 &execution_log.events.get(2).unwrap(),
1517 ExecutionEvent {
1518 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1519 created_at: at,
1520 backtrace_id: None,
1521 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1522 );
1523 assert_eq!(
1524 PendingState::PendingAt {
1525 scheduled_at: expected_first_timeout_expiry
1526 },
1527 execution_log.pending_state
1528 );
1529 sim_clock.move_time_forward(timeout_duration);
1530 let now_after_first_timeout = sim_clock.now();
1531 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1532
1533 let mut second_execution_progress = executor
1534 .tick(now_after_first_timeout, RunId::generate())
1535 .await
1536 .unwrap();
1537 assert_eq!(1, second_execution_progress.executions.len());
1538
1539 sim_clock.move_time_forward(lock_expiry);
1541 let now_after_second_lock_expiry = sim_clock.now();
1543 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1544 {
1545 let cleanup_progress = executor
1546 .tick(now_after_second_lock_expiry, RunId::generate())
1547 .await
1548 .unwrap();
1549 assert!(cleanup_progress.executions.is_empty());
1550 }
1551 {
1552 let expired_locks = expired_timers_watcher::tick(
1553 db_pool.connection().as_ref(),
1554 now_after_second_lock_expiry,
1555 )
1556 .await
1557 .unwrap()
1558 .expired_locks;
1559 assert_eq!(1, expired_locks);
1560 }
1561 assert!(
1562 !second_execution_progress
1563 .executions
1564 .pop()
1565 .unwrap()
1566 .1
1567 .is_finished()
1568 );
1569
1570 drop(db_connection);
1571 drop(executor);
1572 db_pool.close().await.unwrap();
1573 }
1574}