1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7 LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13 FinishedExecutionError,
14 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30 pub lock_expiry: Duration,
31 pub tick_sleep: Duration,
32 pub batch_size: u32,
33 pub component_id: ComponentId,
34 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35 pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39 worker: Arc<dyn Worker>,
40 config: ExecConfig,
41 clock_fn: C, db_pool: Arc<dyn DbPool>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!("Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: Arc<dyn DbPool>,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 clock_fn,
123 db_pool,
124 ffqns,
125 }
126 }
127
128 pub fn spawn_new(
129 worker: Arc<dyn Worker>,
130 config: ExecConfig,
131 clock_fn: C,
132 db_pool: Arc<dyn DbPool>,
133 ) -> ExecutorTaskHandle {
134 let is_closing = Arc::new(AtomicBool::default());
135 let is_closing_inner = is_closing.clone();
136 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137 let component_id = config.component_id.clone();
138 let executor_id = config.executor_id;
139 let abort_handle = tokio::spawn(async move {
140 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141 let task = Self {
142 worker,
143 config,
144 db_pool,
145 ffqns: ffqns.clone(),
146 clock_fn: clock_fn.clone(),
147 };
148 loop {
149 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150 let executed_at = clock_fn.now();
151 task.db_pool
152 .connection()
153 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154 .await;
155 if is_closing_inner.load(Ordering::Relaxed) {
156 return;
157 }
158 }
159 })
160 .abort_handle();
161 ExecutorTaskHandle {
162 is_closing,
163 abort_handle,
164 component_id,
165 executor_id,
166 }
167 }
168
169 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170 if let Some(task_limiter) = &self.config.task_limiter {
171 let mut locks = Vec::new();
172 for _ in 0..self.config.batch_size {
173 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174 locks.push(Some(permit));
175 } else {
176 break;
177 }
178 }
179 locks
180 } else {
181 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182 for _ in 0..self.config.batch_size {
183 vec.push(None);
184 }
185 vec
186 }
187 }
188
189 #[cfg(feature = "test")]
190 pub async fn tick_test(
191 &self,
192 executed_at: DateTime<Utc>,
193 run_id: RunId,
194 ) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at, run_id).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199 async fn tick(
200 &self,
201 executed_at: DateTime<Utc>,
202 run_id: RunId,
203 ) -> Result<ExecutionProgress, ()> {
204 let locked_executions = {
205 let mut permits = self.acquire_task_permits();
206 if permits.is_empty() {
207 return Ok(ExecutionProgress::default());
208 }
209 let db_connection = self.db_pool.connection();
210 let lock_expires_at = executed_at + self.config.lock_expiry;
211 let locked_executions = db_connection
212 .lock_pending(
213 permits.len(), executed_at, self.ffqns.clone(),
216 executed_at, self.config.component_id.clone(),
218 self.config.executor_id,
219 lock_expires_at,
220 run_id,
221 )
222 .await
223 .map_err(|err| {
224 warn!("lock_pending error {err:?}");
225 })?;
226 while permits.len() > locked_executions.len() {
228 permits.pop();
229 }
230 assert_eq!(permits.len(), locked_executions.len());
231 locked_executions.into_iter().zip(permits)
232 };
233 let execution_deadline = executed_at + self.config.lock_expiry;
234
235 let mut executions = Vec::with_capacity(locked_executions.len());
236 for (locked_execution, permit) in locked_executions {
237 let execution_id = locked_execution.execution_id.clone();
238 let join_handle = {
239 let worker = self.worker.clone();
240 let db_pool = self.db_pool.clone();
241 let clock_fn = self.clock_fn.clone();
242 let run_id = locked_execution.run_id;
243 let worker_span = info_span!(parent: None, "worker",
244 "otel.name" = format!("worker {}", locked_execution.ffqn),
245 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246 locked_execution.metadata.enrich(&worker_span);
247 tokio::spawn({
248 let worker_span2 = worker_span.clone();
249 async move {
250 let _permit = permit;
251 let res = Self::run_worker(
252 worker,
253 db_pool.as_ref(),
254 execution_deadline,
255 clock_fn,
256 locked_execution,
257 worker_span2,
258 )
259 .await;
260 if let Err(db_error) = res {
261 error!("Got DbError `{db_error:?}`, expecting watcher to mark execution as timed out");
262 }
263 }
264 .instrument(worker_span)
265 })
266 };
267 executions.push((execution_id, join_handle));
268 }
269 Ok(ExecutionProgress { executions })
270 }
271
272 async fn run_worker(
273 worker: Arc<dyn Worker>,
274 db_pool: &dyn DbPool,
275 execution_deadline: DateTime<Utc>,
276 clock_fn: C,
277 locked_execution: LockedExecution,
278 worker_span: Span,
279 ) -> Result<(), DbError> {
280 debug!("Worker::run starting");
281 trace!(
282 version = %locked_execution.version,
283 params = ?locked_execution.params,
284 event_history = ?locked_execution.event_history,
285 "Worker::run starting"
286 );
287 let can_be_retried = ExecutionLog::can_be_retried_after(
288 locked_execution.temporary_event_count + 1,
289 locked_execution.max_retries,
290 locked_execution.retry_exp_backoff,
291 );
292 let unlock_expiry_on_limit_reached =
293 ExecutionLog::compute_retry_duration_when_retrying_forever(
294 locked_execution.temporary_event_count + 1,
295 locked_execution.retry_exp_backoff,
296 );
297 let ctx = WorkerContext {
298 execution_id: locked_execution.execution_id.clone(),
299 metadata: locked_execution.metadata,
300 ffqn: locked_execution.ffqn,
301 params: locked_execution.params,
302 event_history: locked_execution.event_history,
303 responses: locked_execution
304 .responses
305 .into_iter()
306 .map(|outer| outer.event)
307 .collect(),
308 version: locked_execution.version,
309 execution_deadline,
310 can_be_retried: can_be_retried.is_some(),
311 run_id: locked_execution.run_id,
312 worker_span,
313 };
314 let worker_result = worker.run(ctx).await;
315 trace!(?worker_result, "Worker::run finished");
316 let result_obtained_at = clock_fn.now();
317 match Self::worker_result_to_execution_event(
318 locked_execution.execution_id,
319 worker_result,
320 result_obtained_at,
321 locked_execution.parent,
322 can_be_retried,
323 unlock_expiry_on_limit_reached,
324 )? {
325 Some(append) => {
326 let db_connection = db_pool.connection();
327 trace!("Appending {append:?}");
328 append.append(db_connection.as_ref()).await
329 }
330 None => Ok(()),
331 }
332 }
333
334 fn worker_result_to_execution_event(
336 execution_id: ExecutionId,
337 worker_result: WorkerResult,
338 result_obtained_at: DateTime<Utc>,
339 parent: Option<(ExecutionId, JoinSetId)>,
340 can_be_retried: Option<Duration>,
341 unlock_expiry_on_limit_reached: Duration,
342 ) -> Result<Option<Append>, DbError> {
343 Ok(match worker_result {
344 WorkerResult::Ok(result, new_version, http_client_traces) => {
345 info!(
346 "Execution finished: {}",
347 result.as_pending_state_finished_result()
348 );
349 let child_finished =
350 parent.map(
351 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
352 parent_execution_id,
353 parent_join_set,
354 result: Ok(result.clone()),
355 },
356 );
357 let primary_event = AppendRequest {
358 created_at: result_obtained_at,
359 event: ExecutionEventInner::Finished {
360 result: Ok(result),
361 http_client_traces,
362 },
363 };
364
365 Some(Append {
366 created_at: result_obtained_at,
367 primary_event,
368 execution_id,
369 version: new_version,
370 child_finished,
371 })
372 }
373 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
374 WorkerResult::Err(err) => {
375 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
377 WorkerError::TemporaryTimeout {
378 http_client_traces,
379 version,
380 } => {
381 if let Some(duration) = can_be_retried {
382 let backoff_expires_at = result_obtained_at + duration;
383 info!(
384 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
385 );
386 (
387 ExecutionEventInner::TemporarilyTimedOut {
388 backoff_expires_at,
389 http_client_traces,
390 },
391 None,
392 version,
393 )
394 } else {
395 info!("Permanent timeout");
396 let result = Err(FinishedExecutionError::PermanentTimeout);
397 let child_finished =
398 parent.map(|(parent_execution_id, parent_join_set)| {
399 ChildFinishedResponse {
400 parent_execution_id,
401 parent_join_set,
402 result: result.clone(),
403 }
404 });
405 (
406 ExecutionEventInner::Finished {
407 result,
408 http_client_traces,
409 },
410 child_finished,
411 version,
412 )
413 }
414 }
415 WorkerError::DbError(db_error) => {
416 return Err(db_error);
417 }
418 WorkerError::ActivityTrap {
419 reason: reason_inner,
420 trap_kind,
421 detail,
422 version,
423 http_client_traces,
424 } => retry_or_fail(
425 result_obtained_at,
426 parent,
427 can_be_retried,
428 reason_full,
429 reason_inner,
430 trap_kind, detail,
432 version,
433 http_client_traces,
434 ),
435 WorkerError::ActivityPreopenedDirError {
436 reason_kind,
437 reason_inner,
438 version,
439 } => retry_or_fail(
440 result_obtained_at,
441 parent,
442 can_be_retried,
443 reason_full,
444 reason_inner,
445 reason_kind, None, version,
448 None, ),
450 WorkerError::ActivityReturnedError {
451 detail,
452 version,
453 http_client_traces,
454 } => {
455 let duration = can_be_retried.expect(
456 "ActivityReturnedError must not be returned when retries are exhausted",
457 );
458 let expires_at = result_obtained_at + duration;
459 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
460 (
461 ExecutionEventInner::TemporarilyFailed {
462 backoff_expires_at: expires_at,
463 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
465 detail, http_client_traces,
467 },
468 None,
469 version,
470 )
471 }
472 WorkerError::LimitReached {
473 reason: inner_reason,
474 version: new_version,
475 } => {
476 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
477 warn!(
478 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
479 );
480 (
481 ExecutionEventInner::Unlocked {
482 backoff_expires_at: expires_at,
483 reason: StrVariant::from(reason_full),
484 },
485 None,
486 new_version,
487 )
488 }
489 WorkerError::FatalError(fatal_error, version) => {
490 info!("Fatal worker error - {fatal_error:?}");
491 let result = Err(FinishedExecutionError::from(fatal_error));
492 let child_finished =
493 parent.map(|(parent_execution_id, parent_join_set)| {
494 ChildFinishedResponse {
495 parent_execution_id,
496 parent_join_set,
497 result: result.clone(),
498 }
499 });
500 (
501 ExecutionEventInner::Finished {
502 result,
503 http_client_traces: None,
504 },
505 child_finished,
506 version,
507 )
508 }
509 };
510 Some(Append {
511 created_at: result_obtained_at,
512 primary_event: AppendRequest {
513 created_at: result_obtained_at,
514 event: primary_event,
515 },
516 execution_id,
517 version,
518 child_finished,
519 })
520 }
521 })
522 }
523}
524
525#[expect(clippy::too_many_arguments)]
526fn retry_or_fail(
527 result_obtained_at: DateTime<Utc>,
528 parent: Option<(ExecutionId, JoinSetId)>,
529 can_be_retried: Option<Duration>,
530 reason_full: String,
531 reason_inner: String,
532 err_kind_for_logs: impl Display,
533 detail: Option<String>,
534 version: Version,
535 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
536) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
537 if let Some(duration) = can_be_retried {
538 let expires_at = result_obtained_at + duration;
539 debug!(
540 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
541 );
542 (
543 ExecutionEventInner::TemporarilyFailed {
544 backoff_expires_at: expires_at,
545 reason_full: StrVariant::from(reason_full),
546 reason_inner: StrVariant::from(reason_inner),
547 detail,
548 http_client_traces,
549 },
550 None,
551 version,
552 )
553 } else {
554 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
555 let result = Err(FinishedExecutionError::PermanentFailure {
556 reason_inner,
557 reason_full,
558 kind: PermanentFailureKind::ActivityTrap,
559 detail,
560 });
561 let child_finished =
562 parent.map(
563 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
564 parent_execution_id,
565 parent_join_set,
566 result: result.clone(),
567 },
568 );
569 (
570 ExecutionEventInner::Finished {
571 result,
572 http_client_traces,
573 },
574 child_finished,
575 version,
576 )
577 }
578}
579
580#[derive(Debug, Clone)]
581pub(crate) struct ChildFinishedResponse {
582 pub(crate) parent_execution_id: ExecutionId,
583 pub(crate) parent_join_set: JoinSetId,
584 pub(crate) result: FinishedExecutionResult,
585}
586
587#[derive(Debug, Clone)]
588pub(crate) struct Append {
589 pub(crate) created_at: DateTime<Utc>,
590 pub(crate) primary_event: AppendRequest,
591 pub(crate) execution_id: ExecutionId,
592 pub(crate) version: Version,
593 pub(crate) child_finished: Option<ChildFinishedResponse>,
594}
595
596impl Append {
597 pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
598 if let Some(child_finished) = self.child_finished {
599 assert_matches!(
600 &self.primary_event,
601 AppendRequest {
602 event: ExecutionEventInner::Finished { .. },
603 ..
604 }
605 );
606 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
607 db_connection
608 .append_batch_respond_to_parent(
609 derived.clone(),
610 self.created_at,
611 vec![self.primary_event],
612 self.version.clone(),
613 child_finished.parent_execution_id,
614 JoinSetResponseEventOuter {
615 created_at: self.created_at,
616 event: JoinSetResponseEvent {
617 join_set_id: child_finished.parent_join_set,
618 event: JoinSetResponse::ChildExecutionFinished {
619 child_execution_id: derived,
620 finished_version: self.version,
622 result: child_finished.result,
623 },
624 },
625 },
626 )
627 .await?;
628 } else {
629 db_connection
630 .append_batch(
631 self.created_at,
632 vec![self.primary_event],
633 self.execution_id,
634 self.version,
635 )
636 .await?;
637 }
638 Ok(())
639 }
640}
641
642#[cfg(any(test, feature = "test"))]
643pub mod simple_worker {
644 use crate::worker::{Worker, WorkerContext, WorkerResult};
645 use async_trait::async_trait;
646 use concepts::{
647 FunctionFqn, FunctionMetadata, ParameterTypes,
648 storage::{HistoryEvent, Version},
649 };
650 use indexmap::IndexMap;
651 use std::sync::Arc;
652 use tracing::trace;
653
654 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
655 pub type SimpleWorkerResultMap =
656 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
657
658 #[derive(Clone, Debug)]
659 pub struct SimpleWorker {
660 pub worker_results_rev: SimpleWorkerResultMap,
661 pub ffqn: FunctionFqn,
662 exported: [FunctionMetadata; 1],
663 }
664
665 impl SimpleWorker {
666 #[must_use]
667 pub fn with_single_result(res: WorkerResult) -> Self {
668 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
669 Version::new(2),
670 (vec![], res),
671 )]))))
672 }
673
674 #[must_use]
675 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
676 Self {
677 worker_results_rev: self.worker_results_rev,
678 exported: [FunctionMetadata {
679 ffqn: ffqn.clone(),
680 parameter_types: ParameterTypes::default(),
681 return_type: None,
682 extension: None,
683 submittable: true,
684 }],
685 ffqn,
686 }
687 }
688
689 #[must_use]
690 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
691 Self {
692 worker_results_rev,
693 ffqn: FFQN_SOME,
694 exported: [FunctionMetadata {
695 ffqn: FFQN_SOME,
696 parameter_types: ParameterTypes::default(),
697 return_type: None,
698 extension: None,
699 submittable: true,
700 }],
701 }
702 }
703 }
704
705 #[async_trait]
706 impl Worker for SimpleWorker {
707 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
708 let (expected_version, (expected_eh, worker_result)) =
709 self.worker_results_rev.lock().unwrap().pop().unwrap();
710 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
711 assert_eq!(expected_version, ctx.version);
712 assert_eq!(expected_eh, ctx.event_history);
713 worker_result
714 }
715
716 fn exported_functions(&self) -> &[FunctionMetadata] {
717 &self.exported
718 }
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use self::simple_worker::SimpleWorker;
725 use super::*;
726 use crate::worker::FatalError;
727 use crate::{expired_timers_watcher, worker::WorkerResult};
728 use assert_matches::assert_matches;
729 use async_trait::async_trait;
730 use concepts::storage::{CreateRequest, JoinSetRequest};
731 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
732 use concepts::time::Now;
733 use concepts::{
734 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
735 SupportedFunctionReturnValue, TrapKind,
736 };
737 use db_tests::Database;
738 use indexmap::IndexMap;
739 use simple_worker::FFQN_SOME;
740 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
741 use test_utils::set_up;
742 use test_utils::sim_clock::{ConstClock, SimClock};
743
744 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
745
746 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
747 config: ExecConfig,
748 clock_fn: C,
749 db_pool: Arc<dyn DbPool>,
750 worker: Arc<W>,
751 executed_at: DateTime<Utc>,
752 ) -> ExecutionProgress {
753 trace!("Ticking with {worker:?}");
754 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
755 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
756 let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
757 loop {
758 execution_progress
759 .executions
760 .retain(|(_, abort_handle)| !abort_handle.is_finished());
761 if execution_progress.executions.is_empty() {
762 return execution_progress;
763 }
764 tokio::time::sleep(Duration::from_millis(10)).await;
765 }
766 }
767
768 #[tokio::test]
769 async fn execute_simple_lifecycle_tick_based_mem() {
770 let created_at = Now.now();
771 let (_guard, db_pool) = Database::Memory.set_up().await;
772 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
773 db_pool.close().await.unwrap();
774 }
775
776 #[tokio::test]
777 async fn execute_simple_lifecycle_tick_based_sqlite() {
778 let created_at = Now.now();
779 let (_guard, db_pool) = Database::Sqlite.set_up().await;
780 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
781 db_pool.close().await.unwrap();
782 }
783
784 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
785 pool: Arc<dyn DbPool>,
786 clock_fn: C,
787 ) {
788 set_up();
789 let created_at = clock_fn.now();
790 let exec_config = ExecConfig {
791 batch_size: 1,
792 lock_expiry: Duration::from_secs(1),
793 tick_sleep: Duration::from_millis(100),
794 component_id: ComponentId::dummy_activity(),
795 task_limiter: None,
796 executor_id: ExecutorId::generate(),
797 };
798
799 let execution_log = create_and_tick(
800 CreateAndTickConfig {
801 execution_id: ExecutionId::generate(),
802 created_at,
803 max_retries: 0,
804 executed_at: created_at,
805 retry_exp_backoff: Duration::ZERO,
806 },
807 clock_fn,
808 pool,
809 exec_config,
810 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
811 SupportedFunctionReturnValue::None,
812 Version::new(2),
813 None,
814 ))),
815 tick_fn,
816 )
817 .await;
818 assert_matches!(
819 execution_log.events.get(2).unwrap(),
820 ExecutionEvent {
821 event: ExecutionEventInner::Finished {
822 result: Ok(SupportedFunctionReturnValue::None),
823 http_client_traces: None
824 },
825 created_at: _,
826 backtrace_id: None,
827 }
828 );
829 }
830
831 #[tokio::test]
832 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
833 set_up();
834 let created_at = Now.now();
835 let clock_fn = ConstClock(created_at);
836 let (_guard, db_pool) = Database::Memory.set_up().await;
837 let exec_config = ExecConfig {
838 batch_size: 1,
839 lock_expiry: Duration::from_secs(1),
840 tick_sleep: Duration::ZERO,
841 component_id: ComponentId::dummy_activity(),
842 task_limiter: None,
843 executor_id: ExecutorId::generate(),
844 };
845
846 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
847 SupportedFunctionReturnValue::None,
848 Version::new(2),
849 None,
850 )));
851 let exec_task = ExecTask::spawn_new(
852 worker.clone(),
853 exec_config.clone(),
854 clock_fn,
855 db_pool.clone(),
856 );
857
858 let execution_log = create_and_tick(
859 CreateAndTickConfig {
860 execution_id: ExecutionId::generate(),
861 created_at,
862 max_retries: 0,
863 executed_at: created_at,
864 retry_exp_backoff: Duration::ZERO,
865 },
866 clock_fn,
867 db_pool.clone(),
868 exec_config,
869 worker,
870 |_, _, _, _, _| async {
871 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
873 },
874 )
875 .await;
876 exec_task.close().await;
877 db_pool.close().await.unwrap();
878 assert_matches!(
879 execution_log.events.get(2).unwrap(),
880 ExecutionEvent {
881 event: ExecutionEventInner::Finished {
882 result: Ok(SupportedFunctionReturnValue::None),
883 http_client_traces: None
884 },
885 created_at: _,
886 backtrace_id: None,
887 }
888 );
889 }
890
891 struct CreateAndTickConfig {
892 execution_id: ExecutionId,
893 created_at: DateTime<Utc>,
894 max_retries: u32,
895 executed_at: DateTime<Utc>,
896 retry_exp_backoff: Duration,
897 }
898
899 async fn create_and_tick<
900 W: Worker,
901 C: ClockFn,
902 T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
903 F: Future<Output = ExecutionProgress>,
904 >(
905 config: CreateAndTickConfig,
906 clock_fn: C,
907 db_pool: Arc<dyn DbPool>,
908 exec_config: ExecConfig,
909 worker: Arc<W>,
910 mut tick: T,
911 ) -> ExecutionLog {
912 let db_connection = db_pool.connection();
914 db_connection
915 .create(CreateRequest {
916 created_at: config.created_at,
917 execution_id: config.execution_id.clone(),
918 ffqn: FFQN_SOME,
919 params: Params::empty(),
920 parent: None,
921 metadata: concepts::ExecutionMetadata::empty(),
922 scheduled_at: config.created_at,
923 retry_exp_backoff: config.retry_exp_backoff,
924 max_retries: config.max_retries,
925 component_id: ComponentId::dummy_activity(),
926 scheduled_by: None,
927 })
928 .await
929 .unwrap();
930 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
932 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
933 debug!("Execution history after tick: {execution_log:?}");
934 assert_matches!(
936 execution_log.events.first().unwrap(),
937 ExecutionEvent {
938 event: ExecutionEventInner::Created { .. },
939 created_at: actually_created_at,
940 backtrace_id: None,
941 }
942 if config.created_at == *actually_created_at
943 );
944 let locked_at = assert_matches!(
945 execution_log.events.get(1).unwrap(),
946 ExecutionEvent {
947 event: ExecutionEventInner::Locked { .. },
948 created_at: locked_at,
949 backtrace_id: None,
950 } if config.created_at <= *locked_at
951 => *locked_at
952 );
953 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
954 event: _,
955 created_at: executed_at,
956 backtrace_id: None,
957 } if *executed_at >= locked_at);
958 execution_log
959 }
960
961 #[tokio::test]
962 async fn activity_trap_should_trigger_an_execution_retry() {
963 set_up();
964 let sim_clock = SimClock::default();
965 let (_guard, db_pool) = Database::Memory.set_up().await;
966 let exec_config = ExecConfig {
967 batch_size: 1,
968 lock_expiry: Duration::from_secs(1),
969 tick_sleep: Duration::ZERO,
970 component_id: ComponentId::dummy_activity(),
971 task_limiter: None,
972 executor_id: ExecutorId::generate(),
973 };
974 let expected_reason = "error reason";
975 let expected_detail = "error detail";
976 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
977 WorkerError::ActivityTrap {
978 reason: expected_reason.to_string(),
979 trap_kind: concepts::TrapKind::Trap,
980 detail: Some(expected_detail.to_string()),
981 version: Version::new(2),
982 http_client_traces: None,
983 },
984 )));
985 let retry_exp_backoff = Duration::from_millis(100);
986 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
987 let execution_log = create_and_tick(
988 CreateAndTickConfig {
989 execution_id: ExecutionId::generate(),
990 created_at: sim_clock.now(),
991 max_retries: 1,
992 executed_at: sim_clock.now(),
993 retry_exp_backoff,
994 },
995 sim_clock.clone(),
996 db_pool.clone(),
997 exec_config.clone(),
998 worker,
999 tick_fn,
1000 )
1001 .await;
1002 assert_eq!(3, execution_log.events.len());
1003 {
1004 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1005 &execution_log.events.get(2).unwrap(),
1006 ExecutionEvent {
1007 event: ExecutionEventInner::TemporarilyFailed {
1008 reason_inner,
1009 reason_full,
1010 detail,
1011 backoff_expires_at,
1012 http_client_traces: None,
1013 },
1014 created_at: at,
1015 backtrace_id: None,
1016 }
1017 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1018 );
1019 assert_eq!(expected_reason, reason_inner.deref());
1020 assert_eq!(
1021 format!("activity trap: {expected_reason}"),
1022 reason_full.deref()
1023 );
1024 assert_eq!(Some(expected_detail), detail.as_deref());
1025 assert_eq!(at, sim_clock.now());
1026 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1027 }
1028 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1029 std::sync::Mutex::new(IndexMap::from([(
1030 Version::new(4),
1031 (
1032 vec![],
1033 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4), None),
1034 ),
1035 )])),
1036 )));
1037 assert!(
1039 tick_fn(
1040 exec_config.clone(),
1041 sim_clock.clone(),
1042 db_pool.clone(),
1043 worker.clone(),
1044 sim_clock.now(),
1045 )
1046 .await
1047 .executions
1048 .is_empty()
1049 );
1050 sim_clock.move_time_forward(retry_exp_backoff);
1052 tick_fn(
1053 exec_config,
1054 sim_clock.clone(),
1055 db_pool.clone(),
1056 worker,
1057 sim_clock.now(),
1058 )
1059 .await;
1060 let execution_log = {
1061 let db_connection = db_pool.connection();
1062 db_connection
1063 .get(&execution_log.execution_id)
1064 .await
1065 .unwrap()
1066 };
1067 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1068 assert_matches!(
1069 execution_log.events.get(3).unwrap(),
1070 ExecutionEvent {
1071 event: ExecutionEventInner::Locked { .. },
1072 created_at: at,
1073 backtrace_id: None,
1074 } if *at == sim_clock.now()
1075 );
1076 assert_matches!(
1077 execution_log.events.get(4).unwrap(),
1078 ExecutionEvent {
1079 event: ExecutionEventInner::Finished {
1080 result: Ok(SupportedFunctionReturnValue::None),
1081 http_client_traces: None
1082 },
1083 created_at: finished_at,
1084 backtrace_id: None,
1085 } if *finished_at == sim_clock.now()
1086 );
1087 db_pool.close().await.unwrap();
1088 }
1089
1090 #[tokio::test]
1091 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1092 set_up();
1093 let created_at = Now.now();
1094 let clock_fn = ConstClock(created_at);
1095 let (_guard, db_pool) = Database::Memory.set_up().await;
1096 let exec_config = ExecConfig {
1097 batch_size: 1,
1098 lock_expiry: Duration::from_secs(1),
1099 tick_sleep: Duration::ZERO,
1100 component_id: ComponentId::dummy_activity(),
1101 task_limiter: None,
1102 executor_id: ExecutorId::generate(),
1103 };
1104
1105 let expected_reason = "error reason";
1106 let expected_detail = "error detail";
1107 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1108 WorkerError::ActivityTrap {
1109 reason: expected_reason.to_string(),
1110 trap_kind: concepts::TrapKind::Trap,
1111 detail: Some(expected_detail.to_string()),
1112 version: Version::new(2),
1113 http_client_traces: None,
1114 },
1115 )));
1116 let execution_log = create_and_tick(
1117 CreateAndTickConfig {
1118 execution_id: ExecutionId::generate(),
1119 created_at,
1120 max_retries: 0,
1121 executed_at: created_at,
1122 retry_exp_backoff: Duration::ZERO,
1123 },
1124 clock_fn,
1125 db_pool.clone(),
1126 exec_config.clone(),
1127 worker,
1128 tick_fn,
1129 )
1130 .await;
1131 assert_eq!(3, execution_log.events.len());
1132 let (reason, kind, detail) = assert_matches!(
1133 &execution_log.events.get(2).unwrap(),
1134 ExecutionEvent {
1135 event: ExecutionEventInner::Finished{
1136 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1137 http_client_traces: None
1138 },
1139 created_at: at,
1140 backtrace_id: None,
1141 } if *at == created_at
1142 => (reason_inner, kind, detail)
1143 );
1144 assert_eq!(expected_reason, *reason);
1145 assert_eq!(Some(expected_detail), detail.as_deref());
1146 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1147
1148 db_pool.close().await.unwrap();
1149 }
1150
1151 #[tokio::test]
1152 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1153 let worker_error = WorkerError::ActivityTrap {
1154 reason: "error reason".to_string(),
1155 trap_kind: TrapKind::Trap,
1156 detail: Some("detail".to_string()),
1157 version: Version::new(2),
1158 http_client_traces: None,
1159 };
1160 let expected_child_err = FinishedExecutionError::PermanentFailure {
1161 reason_full: "activity trap: error reason".to_string(),
1162 reason_inner: "error reason".to_string(),
1163 kind: PermanentFailureKind::ActivityTrap,
1164 detail: Some("detail".to_string()),
1165 };
1166 child_execution_permanently_failed_should_notify_parent(
1167 WorkerResult::Err(worker_error),
1168 expected_child_err,
1169 )
1170 .await;
1171 }
1172
1173 #[tokio::test]
1176 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1177 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1178 child_execution_permanently_failed_should_notify_parent(
1179 WorkerResult::DbUpdatedByWorkerOrWatcher,
1180 expected_child_err,
1181 )
1182 .await;
1183 }
1184
1185 #[tokio::test]
1186 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1187 let parent_id = ExecutionId::from_parts(1, 1);
1188 let join_set_id_outer =
1189 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1190 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1191 let join_set_id_inner =
1192 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1193 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1194 let worker_error = WorkerError::FatalError(
1195 FatalError::UnhandledChildExecutionError {
1196 child_execution_id: child_execution_id.clone(),
1197 root_cause_id: root_cause_id.clone(),
1198 },
1199 Version::new(2),
1200 );
1201 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1202 child_execution_id,
1203 root_cause_id,
1204 };
1205 child_execution_permanently_failed_should_notify_parent(
1206 WorkerResult::Err(worker_error),
1207 expected_child_err,
1208 )
1209 .await;
1210 }
1211
1212 async fn child_execution_permanently_failed_should_notify_parent(
1213 worker_result: WorkerResult,
1214 expected_child_err: FinishedExecutionError,
1215 ) {
1216 use concepts::storage::JoinSetResponseEventOuter;
1217 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1218
1219 set_up();
1220 let sim_clock = SimClock::default();
1221 let (_guard, db_pool) = Database::Memory.set_up().await;
1222
1223 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1224 WorkerResult::DbUpdatedByWorkerOrWatcher,
1225 ));
1226 let parent_execution_id = ExecutionId::generate();
1227 db_pool
1228 .connection()
1229 .create(CreateRequest {
1230 created_at: sim_clock.now(),
1231 execution_id: parent_execution_id.clone(),
1232 ffqn: FFQN_SOME,
1233 params: Params::empty(),
1234 parent: None,
1235 metadata: concepts::ExecutionMetadata::empty(),
1236 scheduled_at: sim_clock.now(),
1237 retry_exp_backoff: Duration::ZERO,
1238 max_retries: 0,
1239 component_id: ComponentId::dummy_activity(),
1240 scheduled_by: None,
1241 })
1242 .await
1243 .unwrap();
1244 tick_fn(
1245 ExecConfig {
1246 batch_size: 1,
1247 lock_expiry: LOCK_EXPIRY,
1248 tick_sleep: Duration::ZERO,
1249 component_id: ComponentId::dummy_activity(),
1250 task_limiter: None,
1251 executor_id: ExecutorId::generate(),
1252 },
1253 sim_clock.clone(),
1254 db_pool.clone(),
1255 parent_worker,
1256 sim_clock.now(),
1257 )
1258 .await;
1259
1260 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1261 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1262 {
1264 let child = CreateRequest {
1265 created_at: sim_clock.now(),
1266 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1267 ffqn: FFQN_CHILD,
1268 params: Params::empty(),
1269 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1270 metadata: concepts::ExecutionMetadata::empty(),
1271 scheduled_at: sim_clock.now(),
1272 retry_exp_backoff: Duration::ZERO,
1273 max_retries: 0,
1274 component_id: ComponentId::dummy_activity(),
1275 scheduled_by: None,
1276 };
1277 let current_time = sim_clock.now();
1278 let join_set = AppendRequest {
1279 created_at: current_time,
1280 event: ExecutionEventInner::HistoryEvent {
1281 event: HistoryEvent::JoinSetCreate {
1282 join_set_id: join_set_id.clone(),
1283 closing_strategy: ClosingStrategy::Complete,
1284 },
1285 },
1286 };
1287 let child_exec_req = AppendRequest {
1288 created_at: current_time,
1289 event: ExecutionEventInner::HistoryEvent {
1290 event: HistoryEvent::JoinSetRequest {
1291 join_set_id: join_set_id.clone(),
1292 request: JoinSetRequest::ChildExecutionRequest {
1293 child_execution_id: child_execution_id.clone(),
1294 },
1295 },
1296 },
1297 };
1298 let join_next = AppendRequest {
1299 created_at: current_time,
1300 event: ExecutionEventInner::HistoryEvent {
1301 event: HistoryEvent::JoinNext {
1302 join_set_id: join_set_id.clone(),
1303 run_expires_at: sim_clock.now(),
1304 closing: false,
1305 requested_ffqn: Some(FFQN_CHILD),
1306 },
1307 },
1308 };
1309 db_pool
1310 .connection()
1311 .append_batch_create_new_execution(
1312 current_time,
1313 vec![join_set, child_exec_req, join_next],
1314 parent_execution_id.clone(),
1315 Version::new(2),
1316 vec![child],
1317 )
1318 .await
1319 .unwrap();
1320 }
1321
1322 let child_worker =
1323 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1324
1325 tick_fn(
1327 ExecConfig {
1328 batch_size: 1,
1329 lock_expiry: LOCK_EXPIRY,
1330 tick_sleep: Duration::ZERO,
1331 component_id: ComponentId::dummy_activity(),
1332 task_limiter: None,
1333 executor_id: ExecutorId::generate(),
1334 },
1335 sim_clock.clone(),
1336 db_pool.clone(),
1337 child_worker,
1338 sim_clock.now(),
1339 )
1340 .await;
1341 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1342 sim_clock.move_time_forward(LOCK_EXPIRY);
1344 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1345 .await
1346 .unwrap();
1347 }
1348 let child_log = db_pool
1349 .connection()
1350 .get(&ExecutionId::Derived(child_execution_id.clone()))
1351 .await
1352 .unwrap();
1353 assert!(child_log.pending_state.is_finished());
1354 assert_eq!(
1355 Version(2),
1356 child_log.next_version,
1357 "created = 0, locked = 1, with_single_result = 2"
1358 );
1359 assert_eq!(
1360 ExecutionEventInner::Finished {
1361 result: Err(expected_child_err),
1362 http_client_traces: None
1363 },
1364 child_log.last_event().event
1365 );
1366 let parent_log = db_pool
1367 .connection()
1368 .get(&parent_execution_id)
1369 .await
1370 .unwrap();
1371 assert_matches!(
1372 parent_log.pending_state,
1373 PendingState::PendingAt {
1374 scheduled_at
1375 } if scheduled_at == sim_clock.now(),
1376 "parent should be back to pending"
1377 );
1378 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1379 parent_log.responses.last(),
1380 Some(JoinSetResponseEventOuter{
1381 created_at: at,
1382 event: JoinSetResponseEvent{
1383 join_set_id: found_join_set_id,
1384 event: JoinSetResponse::ChildExecutionFinished {
1385 child_execution_id: found_child_execution_id,
1386 finished_version,
1387 result: found_result,
1388 }
1389 }
1390 })
1391 if *at == sim_clock.now()
1392 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1393 );
1394 assert_eq!(join_set_id, *found_join_set_id);
1395 assert_eq!(child_execution_id, *found_child_execution_id);
1396 assert_eq!(child_log.next_version, *child_finished_version);
1397 assert!(found_result.is_err());
1398
1399 db_pool.close().await.unwrap();
1400 }
1401
1402 #[derive(Clone, Debug)]
1403 struct SleepyWorker {
1404 duration: Duration,
1405 result: SupportedFunctionReturnValue,
1406 exported: [FunctionMetadata; 1],
1407 }
1408
1409 #[async_trait]
1410 impl Worker for SleepyWorker {
1411 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1412 tokio::time::sleep(self.duration).await;
1413 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1414 }
1415
1416 fn exported_functions(&self) -> &[FunctionMetadata] {
1417 &self.exported
1418 }
1419 }
1420
1421 #[tokio::test]
1422 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1423 set_up();
1424 let sim_clock = SimClock::default();
1425 let (_guard, db_pool) = Database::Memory.set_up().await;
1426 let lock_expiry = Duration::from_millis(100);
1427 let exec_config = ExecConfig {
1428 batch_size: 1,
1429 lock_expiry,
1430 tick_sleep: Duration::ZERO,
1431 component_id: ComponentId::dummy_activity(),
1432 task_limiter: None,
1433 executor_id: ExecutorId::generate(),
1434 };
1435
1436 let worker = Arc::new(SleepyWorker {
1437 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1439 exported: [FunctionMetadata {
1440 ffqn: FFQN_SOME,
1441 parameter_types: ParameterTypes::default(),
1442 return_type: None,
1443 extension: None,
1444 submittable: true,
1445 }],
1446 });
1447 let execution_id = ExecutionId::generate();
1449 let timeout_duration = Duration::from_millis(300);
1450 let db_connection = db_pool.connection();
1451 db_connection
1452 .create(CreateRequest {
1453 created_at: sim_clock.now(),
1454 execution_id: execution_id.clone(),
1455 ffqn: FFQN_SOME,
1456 params: Params::empty(),
1457 parent: None,
1458 metadata: concepts::ExecutionMetadata::empty(),
1459 scheduled_at: sim_clock.now(),
1460 retry_exp_backoff: timeout_duration,
1461 max_retries: 1,
1462 component_id: ComponentId::dummy_activity(),
1463 scheduled_by: None,
1464 })
1465 .await
1466 .unwrap();
1467
1468 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1469 let executor = ExecTask::new(
1470 worker,
1471 exec_config,
1472 sim_clock.clone(),
1473 db_pool.clone(),
1474 ffqns,
1475 );
1476 let mut first_execution_progress = executor
1477 .tick(sim_clock.now(), RunId::generate())
1478 .await
1479 .unwrap();
1480 assert_eq!(1, first_execution_progress.executions.len());
1481 sim_clock.move_time_forward(lock_expiry);
1483 let now_after_first_lock_expiry = sim_clock.now();
1485 {
1486 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1487 let cleanup_progress = executor
1488 .tick(now_after_first_lock_expiry, RunId::generate())
1489 .await
1490 .unwrap();
1491 assert!(cleanup_progress.executions.is_empty());
1492 }
1493 {
1494 let expired_locks = expired_timers_watcher::tick(
1495 db_pool.connection().as_ref(),
1496 now_after_first_lock_expiry,
1497 )
1498 .await
1499 .unwrap()
1500 .expired_locks;
1501 assert_eq!(1, expired_locks);
1502 }
1503 assert!(
1504 !first_execution_progress
1505 .executions
1506 .pop()
1507 .unwrap()
1508 .1
1509 .is_finished()
1510 );
1511
1512 let execution_log = db_connection.get(&execution_id).await.unwrap();
1513 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1514 assert_matches!(
1515 &execution_log.events.get(2).unwrap(),
1516 ExecutionEvent {
1517 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1518 created_at: at,
1519 backtrace_id: None,
1520 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1521 );
1522 assert_eq!(
1523 PendingState::PendingAt {
1524 scheduled_at: expected_first_timeout_expiry
1525 },
1526 execution_log.pending_state
1527 );
1528 sim_clock.move_time_forward(timeout_duration);
1529 let now_after_first_timeout = sim_clock.now();
1530 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1531
1532 let mut second_execution_progress = executor
1533 .tick(now_after_first_timeout, RunId::generate())
1534 .await
1535 .unwrap();
1536 assert_eq!(1, second_execution_progress.executions.len());
1537
1538 sim_clock.move_time_forward(lock_expiry);
1540 let now_after_second_lock_expiry = sim_clock.now();
1542 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1543 {
1544 let cleanup_progress = executor
1545 .tick(now_after_second_lock_expiry, RunId::generate())
1546 .await
1547 .unwrap();
1548 assert!(cleanup_progress.executions.is_empty());
1549 }
1550 {
1551 let expired_locks = expired_timers_watcher::tick(
1552 db_pool.connection().as_ref(),
1553 now_after_second_lock_expiry,
1554 )
1555 .await
1556 .unwrap()
1557 .expired_locks;
1558 assert_eq!(1, expired_locks);
1559 }
1560 assert!(
1561 !second_execution_progress
1562 .executions
1563 .pop()
1564 .unwrap()
1565 .1
1566 .is_finished()
1567 );
1568
1569 drop(db_connection);
1570 drop(executor);
1571 db_pool.close().await.unwrap();
1572 }
1573}