1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
7 LockedExecution,
8};
9use concepts::time::ClockFn;
10use concepts::{ComponentId, FunctionMetadata, StrVariant, SupportedFunctionReturnValue};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13 FinishedExecutionError,
14 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30 pub lock_expiry: Duration,
31 pub tick_sleep: Duration,
32 pub batch_size: u32,
33 pub component_id: ComponentId,
34 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35 pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39 worker: Arc<dyn Worker>,
40 config: ExecConfig,
41 clock_fn: C, db_pool: Arc<dyn DbPool>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!("Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_exported_ffqns_noext(worker)
100}
101
102fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static> ExecTask<C> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: Arc<dyn DbPool>,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 clock_fn,
123 db_pool,
124 ffqns,
125 }
126 }
127
128 pub fn spawn_new(
129 worker: Arc<dyn Worker>,
130 config: ExecConfig,
131 clock_fn: C,
132 db_pool: Arc<dyn DbPool>,
133 ) -> ExecutorTaskHandle {
134 let is_closing = Arc::new(AtomicBool::default());
135 let is_closing_inner = is_closing.clone();
136 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137 let component_id = config.component_id.clone();
138 let executor_id = config.executor_id;
139 let abort_handle = tokio::spawn(async move {
140 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
141 let task = ExecTask {
142 worker,
143 config,
144 db_pool,
145 ffqns: ffqns.clone(),
146 clock_fn: clock_fn.clone(),
147 };
148 loop {
149 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
150 let executed_at = clock_fn.now();
151 task.db_pool
152 .connection()
153 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
154 .await;
155 if is_closing_inner.load(Ordering::Relaxed) {
156 return;
157 }
158 }
159 })
160 .abort_handle();
161 ExecutorTaskHandle {
162 is_closing,
163 abort_handle,
164 component_id,
165 executor_id,
166 }
167 }
168
169 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
170 if let Some(task_limiter) = &self.config.task_limiter {
171 let mut locks = Vec::new();
172 for _ in 0..self.config.batch_size {
173 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
174 locks.push(Some(permit));
175 } else {
176 break;
177 }
178 }
179 locks
180 } else {
181 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
182 for _ in 0..self.config.batch_size {
183 vec.push(None);
184 }
185 vec
186 }
187 }
188
189 #[cfg(feature = "test")]
190 pub async fn tick_test(
191 &self,
192 executed_at: DateTime<Utc>,
193 run_id: RunId,
194 ) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at, run_id).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
199 async fn tick(
200 &self,
201 executed_at: DateTime<Utc>,
202 run_id: RunId,
203 ) -> Result<ExecutionProgress, ()> {
204 let locked_executions = {
205 let mut permits = self.acquire_task_permits();
206 if permits.is_empty() {
207 return Ok(ExecutionProgress::default());
208 }
209 let db_connection = self.db_pool.connection();
210 let lock_expires_at = executed_at + self.config.lock_expiry;
211 let locked_executions = db_connection
212 .lock_pending(
213 permits.len(), executed_at, self.ffqns.clone(),
216 executed_at, self.config.component_id.clone(),
218 self.config.executor_id,
219 lock_expires_at,
220 run_id,
221 )
222 .await
223 .map_err(|err| {
224 warn!("lock_pending error {err:?}");
225 })?;
226 while permits.len() > locked_executions.len() {
228 permits.pop();
229 }
230 assert_eq!(permits.len(), locked_executions.len());
231 locked_executions.into_iter().zip(permits)
232 };
233 let execution_deadline = executed_at + self.config.lock_expiry;
234
235 let mut executions = Vec::with_capacity(locked_executions.len());
236 for (locked_execution, permit) in locked_executions {
237 let execution_id = locked_execution.execution_id.clone();
238 let join_handle = {
239 let worker = self.worker.clone();
240 let db_pool = self.db_pool.clone();
241 let clock_fn = self.clock_fn.clone();
242 let run_id = locked_execution.run_id;
243 let worker_span = info_span!(parent: None, "worker",
244 "otel.name" = format!("worker {}", locked_execution.ffqn),
245 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
246 locked_execution.metadata.enrich(&worker_span);
247 tokio::spawn({
248 let worker_span2 = worker_span.clone();
249 async move {
250 let _permit = permit;
251 let res = Self::run_worker(
252 worker,
253 db_pool.as_ref(),
254 execution_deadline,
255 clock_fn,
256 locked_execution,
257 worker_span2,
258 )
259 .await;
260 if let Err(db_error) = res {
261 error!("Got DbError `{db_error:?}`, expecting watcher to mark execution as timed out");
262 }
263 }
264 .instrument(worker_span)
265 })
266 };
267 executions.push((execution_id, join_handle));
268 }
269 Ok(ExecutionProgress { executions })
270 }
271
272 async fn run_worker(
273 worker: Arc<dyn Worker>,
274 db_pool: &dyn DbPool,
275 execution_deadline: DateTime<Utc>,
276 clock_fn: C,
277 locked_execution: LockedExecution,
278 worker_span: Span,
279 ) -> Result<(), DbError> {
280 debug!("Worker::run starting");
281 trace!(
282 version = %locked_execution.version,
283 params = ?locked_execution.params,
284 event_history = ?locked_execution.event_history,
285 "Worker::run starting"
286 );
287 let can_be_retried = ExecutionLog::can_be_retried_after(
288 locked_execution.temporary_event_count + 1,
289 locked_execution.max_retries,
290 locked_execution.retry_exp_backoff,
291 );
292 let unlock_expiry_on_limit_reached =
293 ExecutionLog::compute_retry_duration_when_retrying_forever(
294 locked_execution.temporary_event_count + 1,
295 locked_execution.retry_exp_backoff,
296 );
297 let ctx = WorkerContext {
298 execution_id: locked_execution.execution_id.clone(),
299 metadata: locked_execution.metadata,
300 ffqn: locked_execution.ffqn,
301 params: locked_execution.params,
302 event_history: locked_execution.event_history,
303 responses: locked_execution
304 .responses
305 .into_iter()
306 .map(|outer| outer.event)
307 .collect(),
308 version: locked_execution.version,
309 execution_deadline,
310 can_be_retried: can_be_retried.is_some(),
311 run_id: locked_execution.run_id,
312 worker_span,
313 };
314 let worker_result = worker.run(ctx).await;
315 trace!(?worker_result, "Worker::run finished");
316 let result_obtained_at = clock_fn.now();
317 match Self::worker_result_to_execution_event(
318 locked_execution.execution_id,
319 worker_result,
320 result_obtained_at,
321 locked_execution.parent,
322 can_be_retried,
323 unlock_expiry_on_limit_reached,
324 )? {
325 Some(append) => {
326 let db_connection = db_pool.connection();
327 trace!("Appending {append:?}");
328 append.append(db_connection.as_ref()).await
329 }
330 None => Ok(()),
331 }
332 }
333
334 fn worker_result_to_execution_event(
336 execution_id: ExecutionId,
337 worker_result: WorkerResult,
338 result_obtained_at: DateTime<Utc>,
339 parent: Option<(ExecutionId, JoinSetId)>,
340 can_be_retried: Option<Duration>,
341 unlock_expiry_on_limit_reached: Duration,
342 ) -> Result<Option<Append>, DbError> {
343 Ok(match worker_result {
344 WorkerResult::Ok(result, new_version, http_client_traces) => {
345 info!(
346 "Execution finished: {}",
347 result.as_pending_state_finished_result()
348 );
349 let child_finished =
350 parent.map(
351 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
352 parent_execution_id,
353 parent_join_set,
354 result: result.clone(),
355 },
356 );
357 let primary_event = AppendRequest {
358 created_at: result_obtained_at,
359 event: ExecutionEventInner::Finished {
360 result,
361 http_client_traces,
362 },
363 };
364
365 Some(Append {
366 created_at: result_obtained_at,
367 primary_event,
368 execution_id,
369 version: new_version,
370 child_finished,
371 })
372 }
373 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
374 WorkerResult::Err(err) => {
375 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
377 WorkerError::TemporaryTimeout {
378 http_client_traces,
379 version,
380 } => {
381 if let Some(duration) = can_be_retried {
382 let backoff_expires_at = result_obtained_at + duration;
383 info!(
384 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
385 );
386 (
387 ExecutionEventInner::TemporarilyTimedOut {
388 backoff_expires_at,
389 http_client_traces,
390 },
391 None,
392 version,
393 )
394 } else {
395 info!("Permanent timeout");
396 let result = SupportedFunctionReturnValue::ExecutionError(
397 FinishedExecutionError::PermanentTimeout,
398 );
399 let child_finished =
400 parent.map(|(parent_execution_id, parent_join_set)| {
401 ChildFinishedResponse {
402 parent_execution_id,
403 parent_join_set,
404 result: result.clone(),
405 }
406 });
407 (
408 ExecutionEventInner::Finished {
409 result,
410 http_client_traces,
411 },
412 child_finished,
413 version,
414 )
415 }
416 }
417 WorkerError::DbError(db_error) => {
418 return Err(db_error);
419 }
420 WorkerError::ActivityTrap {
421 reason: reason_inner,
422 trap_kind,
423 detail,
424 version,
425 http_client_traces,
426 } => retry_or_fail(
427 result_obtained_at,
428 parent,
429 can_be_retried,
430 reason_full,
431 reason_inner,
432 trap_kind, detail,
434 version,
435 http_client_traces,
436 ),
437 WorkerError::ActivityPreopenedDirError {
438 reason_kind,
439 reason_inner,
440 version,
441 } => retry_or_fail(
442 result_obtained_at,
443 parent,
444 can_be_retried,
445 reason_full,
446 reason_inner,
447 reason_kind, None, version,
450 None, ),
452 WorkerError::ActivityReturnedError {
453 detail,
454 version,
455 http_client_traces,
456 } => {
457 let duration = can_be_retried.expect(
458 "ActivityReturnedError must not be returned when retries are exhausted",
459 );
460 let expires_at = result_obtained_at + duration;
461 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
462 (
463 ExecutionEventInner::TemporarilyFailed {
464 backoff_expires_at: expires_at,
465 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
467 detail, http_client_traces,
469 },
470 None,
471 version,
472 )
473 }
474 WorkerError::LimitReached {
475 reason: inner_reason,
476 version: new_version,
477 } => {
478 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
479 warn!(
480 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
481 );
482 (
483 ExecutionEventInner::Unlocked {
484 backoff_expires_at: expires_at,
485 reason: StrVariant::from(reason_full),
486 },
487 None,
488 new_version,
489 )
490 }
491 WorkerError::FatalError(fatal_error, version) => {
492 info!("Fatal worker error - {fatal_error:?}");
493 let result = SupportedFunctionReturnValue::ExecutionError(
494 FinishedExecutionError::from(fatal_error),
495 );
496 let child_finished =
497 parent.map(|(parent_execution_id, parent_join_set)| {
498 ChildFinishedResponse {
499 parent_execution_id,
500 parent_join_set,
501 result: result.clone(),
502 }
503 });
504 (
505 ExecutionEventInner::Finished {
506 result,
507 http_client_traces: None,
508 },
509 child_finished,
510 version,
511 )
512 }
513 };
514 Some(Append {
515 created_at: result_obtained_at,
516 primary_event: AppendRequest {
517 created_at: result_obtained_at,
518 event: primary_event,
519 },
520 execution_id,
521 version,
522 child_finished,
523 })
524 }
525 })
526 }
527}
528
529#[expect(clippy::too_many_arguments)]
530fn retry_or_fail(
531 result_obtained_at: DateTime<Utc>,
532 parent: Option<(ExecutionId, JoinSetId)>,
533 can_be_retried: Option<Duration>,
534 reason_full: String,
535 reason_inner: String,
536 err_kind_for_logs: impl Display,
537 detail: Option<String>,
538 version: Version,
539 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
540) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
541 if let Some(duration) = can_be_retried {
542 let expires_at = result_obtained_at + duration;
543 debug!(
544 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
545 );
546 (
547 ExecutionEventInner::TemporarilyFailed {
548 backoff_expires_at: expires_at,
549 reason_full: StrVariant::from(reason_full),
550 reason_inner: StrVariant::from(reason_inner),
551 detail,
552 http_client_traces,
553 },
554 None,
555 version,
556 )
557 } else {
558 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
559 let result = SupportedFunctionReturnValue::ExecutionError(
560 FinishedExecutionError::PermanentFailure {
561 reason_inner,
562 reason_full,
563 kind: PermanentFailureKind::ActivityTrap,
564 detail,
565 },
566 );
567
568 let child_finished =
569 parent.map(
570 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
571 parent_execution_id,
572 parent_join_set,
573 result: result.clone(),
574 },
575 );
576 (
577 ExecutionEventInner::Finished {
578 result,
579 http_client_traces,
580 },
581 child_finished,
582 version,
583 )
584 }
585}
586
587#[derive(Debug, Clone)]
588pub(crate) struct ChildFinishedResponse {
589 pub(crate) parent_execution_id: ExecutionId,
590 pub(crate) parent_join_set: JoinSetId,
591 pub(crate) result: SupportedFunctionReturnValue,
592}
593
594#[derive(Debug, Clone)]
595pub(crate) struct Append {
596 pub(crate) created_at: DateTime<Utc>,
597 pub(crate) primary_event: AppendRequest,
598 pub(crate) execution_id: ExecutionId,
599 pub(crate) version: Version,
600 pub(crate) child_finished: Option<ChildFinishedResponse>,
601}
602
603impl Append {
604 pub(crate) async fn append(self, db_connection: &dyn DbConnection) -> Result<(), DbError> {
605 if let Some(child_finished) = self.child_finished {
606 assert_matches!(
607 &self.primary_event,
608 AppendRequest {
609 event: ExecutionEventInner::Finished { .. },
610 ..
611 }
612 );
613 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
614 db_connection
615 .append_batch_respond_to_parent(
616 derived.clone(),
617 self.created_at,
618 vec![self.primary_event],
619 self.version.clone(),
620 child_finished.parent_execution_id,
621 JoinSetResponseEventOuter {
622 created_at: self.created_at,
623 event: JoinSetResponseEvent {
624 join_set_id: child_finished.parent_join_set,
625 event: JoinSetResponse::ChildExecutionFinished {
626 child_execution_id: derived,
627 finished_version: self.version,
629 result: child_finished.result,
630 },
631 },
632 },
633 )
634 .await?;
635 } else {
636 db_connection
637 .append_batch(
638 self.created_at,
639 vec![self.primary_event],
640 self.execution_id,
641 self.version,
642 )
643 .await?;
644 }
645 Ok(())
646 }
647}
648
649#[cfg(any(test, feature = "test"))]
650pub mod simple_worker {
651 use crate::worker::{Worker, WorkerContext, WorkerResult};
652 use async_trait::async_trait;
653 use concepts::{
654 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
655 storage::{HistoryEvent, Version},
656 };
657 use indexmap::IndexMap;
658 use std::sync::Arc;
659 use tracing::trace;
660
661 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
662 pub type SimpleWorkerResultMap =
663 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
664
665 #[derive(Clone, Debug)]
666 pub struct SimpleWorker {
667 pub worker_results_rev: SimpleWorkerResultMap,
668 pub ffqn: FunctionFqn,
669 exported: [FunctionMetadata; 1],
670 }
671
672 impl SimpleWorker {
673 #[must_use]
674 pub fn with_single_result(res: WorkerResult) -> Self {
675 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
676 Version::new(2),
677 (vec![], res),
678 )]))))
679 }
680
681 #[must_use]
682 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
683 Self {
684 worker_results_rev: self.worker_results_rev,
685 exported: [FunctionMetadata {
686 ffqn: ffqn.clone(),
687 parameter_types: ParameterTypes::default(),
688 return_type: RETURN_TYPE_DUMMY,
689 extension: None,
690 submittable: true,
691 }],
692 ffqn,
693 }
694 }
695
696 #[must_use]
697 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
698 Self {
699 worker_results_rev,
700 ffqn: FFQN_SOME,
701 exported: [FunctionMetadata {
702 ffqn: FFQN_SOME,
703 parameter_types: ParameterTypes::default(),
704 return_type: RETURN_TYPE_DUMMY,
705 extension: None,
706 submittable: true,
707 }],
708 }
709 }
710 }
711
712 #[async_trait]
713 impl Worker for SimpleWorker {
714 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
715 let (expected_version, (expected_eh, worker_result)) =
716 self.worker_results_rev.lock().unwrap().pop().unwrap();
717 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
718 assert_eq!(expected_version, ctx.version);
719 assert_eq!(expected_eh, ctx.event_history);
720 worker_result
721 }
722
723 fn exported_functions(&self) -> &[FunctionMetadata] {
724 &self.exported
725 }
726 }
727}
728
729#[cfg(test)]
730mod tests {
731 use self::simple_worker::SimpleWorker;
732 use super::*;
733 use crate::{expired_timers_watcher, worker::WorkerResult};
734 use assert_matches::assert_matches;
735 use async_trait::async_trait;
736 use concepts::storage::{CreateRequest, JoinSetRequest};
737 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
738 use concepts::time::Now;
739 use concepts::{
740 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
741 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
742 };
743 use db_tests::Database;
744 use indexmap::IndexMap;
745 use simple_worker::FFQN_SOME;
746 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
747 use test_utils::set_up;
748 use test_utils::sim_clock::{ConstClock, SimClock};
749
750 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
751
752 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
753 config: ExecConfig,
754 clock_fn: C,
755 db_pool: Arc<dyn DbPool>,
756 worker: Arc<W>,
757 executed_at: DateTime<Utc>,
758 ) -> ExecutionProgress {
759 trace!("Ticking with {worker:?}");
760 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
761 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
762 let mut execution_progress = executor.tick(executed_at, RunId::generate()).await.unwrap();
763 loop {
764 execution_progress
765 .executions
766 .retain(|(_, abort_handle)| !abort_handle.is_finished());
767 if execution_progress.executions.is_empty() {
768 return execution_progress;
769 }
770 tokio::time::sleep(Duration::from_millis(10)).await;
771 }
772 }
773
774 #[tokio::test]
775 async fn execute_simple_lifecycle_tick_based_mem() {
776 let created_at = Now.now();
777 let (_guard, db_pool) = Database::Memory.set_up().await;
778 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
779 db_pool.close().await.unwrap();
780 }
781
782 #[tokio::test]
783 async fn execute_simple_lifecycle_tick_based_sqlite() {
784 let created_at = Now.now();
785 let (_guard, db_pool) = Database::Sqlite.set_up().await;
786 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
787 db_pool.close().await.unwrap();
788 }
789
790 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
791 pool: Arc<dyn DbPool>,
792 clock_fn: C,
793 ) {
794 set_up();
795 let created_at = clock_fn.now();
796 let exec_config = ExecConfig {
797 batch_size: 1,
798 lock_expiry: Duration::from_secs(1),
799 tick_sleep: Duration::from_millis(100),
800 component_id: ComponentId::dummy_activity(),
801 task_limiter: None,
802 executor_id: ExecutorId::generate(),
803 };
804
805 let execution_log = create_and_tick(
806 CreateAndTickConfig {
807 execution_id: ExecutionId::generate(),
808 created_at,
809 max_retries: 0,
810 executed_at: created_at,
811 retry_exp_backoff: Duration::ZERO,
812 },
813 clock_fn,
814 pool,
815 exec_config,
816 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
817 SUPPORTED_RETURN_VALUE_OK_EMPTY,
818 Version::new(2),
819 None,
820 ))),
821 tick_fn,
822 )
823 .await;
824 assert_matches!(
825 execution_log.events.get(2).unwrap(),
826 ExecutionEvent {
827 event: ExecutionEventInner::Finished {
828 result: SupportedFunctionReturnValue::Ok { ok: None },
829 http_client_traces: None
830 },
831 created_at: _,
832 backtrace_id: None,
833 }
834 );
835 }
836
837 #[tokio::test]
838 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
839 set_up();
840 let created_at = Now.now();
841 let clock_fn = ConstClock(created_at);
842 let (_guard, db_pool) = Database::Memory.set_up().await;
843 let exec_config = ExecConfig {
844 batch_size: 1,
845 lock_expiry: Duration::from_secs(1),
846 tick_sleep: Duration::ZERO,
847 component_id: ComponentId::dummy_activity(),
848 task_limiter: None,
849 executor_id: ExecutorId::generate(),
850 };
851
852 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
853 SUPPORTED_RETURN_VALUE_OK_EMPTY,
854 Version::new(2),
855 None,
856 )));
857 let exec_task = ExecTask::spawn_new(
858 worker.clone(),
859 exec_config.clone(),
860 clock_fn,
861 db_pool.clone(),
862 );
863
864 let execution_log = create_and_tick(
865 CreateAndTickConfig {
866 execution_id: ExecutionId::generate(),
867 created_at,
868 max_retries: 0,
869 executed_at: created_at,
870 retry_exp_backoff: Duration::ZERO,
871 },
872 clock_fn,
873 db_pool.clone(),
874 exec_config,
875 worker,
876 |_, _, _, _, _| async {
877 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
879 },
880 )
881 .await;
882 exec_task.close().await;
883 db_pool.close().await.unwrap();
884 assert_matches!(
885 execution_log.events.get(2).unwrap(),
886 ExecutionEvent {
887 event: ExecutionEventInner::Finished {
888 result: SupportedFunctionReturnValue::Ok { ok: None },
889 http_client_traces: None
890 },
891 created_at: _,
892 backtrace_id: None,
893 }
894 );
895 }
896
897 struct CreateAndTickConfig {
898 execution_id: ExecutionId,
899 created_at: DateTime<Utc>,
900 max_retries: u32,
901 executed_at: DateTime<Utc>,
902 retry_exp_backoff: Duration,
903 }
904
905 async fn create_and_tick<
906 W: Worker,
907 C: ClockFn,
908 T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
909 F: Future<Output = ExecutionProgress>,
910 >(
911 config: CreateAndTickConfig,
912 clock_fn: C,
913 db_pool: Arc<dyn DbPool>,
914 exec_config: ExecConfig,
915 worker: Arc<W>,
916 mut tick: T,
917 ) -> ExecutionLog {
918 let db_connection = db_pool.connection();
920 db_connection
921 .create(CreateRequest {
922 created_at: config.created_at,
923 execution_id: config.execution_id.clone(),
924 ffqn: FFQN_SOME,
925 params: Params::empty(),
926 parent: None,
927 metadata: concepts::ExecutionMetadata::empty(),
928 scheduled_at: config.created_at,
929 retry_exp_backoff: config.retry_exp_backoff,
930 max_retries: config.max_retries,
931 component_id: ComponentId::dummy_activity(),
932 scheduled_by: None,
933 })
934 .await
935 .unwrap();
936 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
938 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
939 debug!("Execution history after tick: {execution_log:?}");
940 assert_matches!(
942 execution_log.events.first().unwrap(),
943 ExecutionEvent {
944 event: ExecutionEventInner::Created { .. },
945 created_at: actually_created_at,
946 backtrace_id: None,
947 }
948 if config.created_at == *actually_created_at
949 );
950 let locked_at = assert_matches!(
951 execution_log.events.get(1).unwrap(),
952 ExecutionEvent {
953 event: ExecutionEventInner::Locked { .. },
954 created_at: locked_at,
955 backtrace_id: None,
956 } if config.created_at <= *locked_at
957 => *locked_at
958 );
959 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
960 event: _,
961 created_at: executed_at,
962 backtrace_id: None,
963 } if *executed_at >= locked_at);
964 execution_log
965 }
966
967 #[tokio::test]
968 async fn activity_trap_should_trigger_an_execution_retry() {
969 set_up();
970 let sim_clock = SimClock::default();
971 let (_guard, db_pool) = Database::Memory.set_up().await;
972 let exec_config = ExecConfig {
973 batch_size: 1,
974 lock_expiry: Duration::from_secs(1),
975 tick_sleep: Duration::ZERO,
976 component_id: ComponentId::dummy_activity(),
977 task_limiter: None,
978 executor_id: ExecutorId::generate(),
979 };
980 let expected_reason = "error reason";
981 let expected_detail = "error detail";
982 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
983 WorkerError::ActivityTrap {
984 reason: expected_reason.to_string(),
985 trap_kind: concepts::TrapKind::Trap,
986 detail: Some(expected_detail.to_string()),
987 version: Version::new(2),
988 http_client_traces: None,
989 },
990 )));
991 let retry_exp_backoff = Duration::from_millis(100);
992 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
993 let execution_log = create_and_tick(
994 CreateAndTickConfig {
995 execution_id: ExecutionId::generate(),
996 created_at: sim_clock.now(),
997 max_retries: 1,
998 executed_at: sim_clock.now(),
999 retry_exp_backoff,
1000 },
1001 sim_clock.clone(),
1002 db_pool.clone(),
1003 exec_config.clone(),
1004 worker,
1005 tick_fn,
1006 )
1007 .await;
1008 assert_eq!(3, execution_log.events.len());
1009 {
1010 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1011 &execution_log.events.get(2).unwrap(),
1012 ExecutionEvent {
1013 event: ExecutionEventInner::TemporarilyFailed {
1014 reason_inner,
1015 reason_full,
1016 detail,
1017 backoff_expires_at,
1018 http_client_traces: None,
1019 },
1020 created_at: at,
1021 backtrace_id: None,
1022 }
1023 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1024 );
1025 assert_eq!(expected_reason, reason_inner.deref());
1026 assert_eq!(
1027 format!("activity trap: {expected_reason}"),
1028 reason_full.deref()
1029 );
1030 assert_eq!(Some(expected_detail), detail.as_deref());
1031 assert_eq!(at, sim_clock.now());
1032 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1033 }
1034 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1035 std::sync::Mutex::new(IndexMap::from([(
1036 Version::new(4),
1037 (
1038 vec![],
1039 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1040 ),
1041 )])),
1042 )));
1043 assert!(
1045 tick_fn(
1046 exec_config.clone(),
1047 sim_clock.clone(),
1048 db_pool.clone(),
1049 worker.clone(),
1050 sim_clock.now(),
1051 )
1052 .await
1053 .executions
1054 .is_empty()
1055 );
1056 sim_clock.move_time_forward(retry_exp_backoff);
1058 tick_fn(
1059 exec_config,
1060 sim_clock.clone(),
1061 db_pool.clone(),
1062 worker,
1063 sim_clock.now(),
1064 )
1065 .await;
1066 let execution_log = {
1067 let db_connection = db_pool.connection();
1068 db_connection
1069 .get(&execution_log.execution_id)
1070 .await
1071 .unwrap()
1072 };
1073 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1074 assert_matches!(
1075 execution_log.events.get(3).unwrap(),
1076 ExecutionEvent {
1077 event: ExecutionEventInner::Locked { .. },
1078 created_at: at,
1079 backtrace_id: None,
1080 } if *at == sim_clock.now()
1081 );
1082 assert_matches!(
1083 execution_log.events.get(4).unwrap(),
1084 ExecutionEvent {
1085 event: ExecutionEventInner::Finished {
1086 result: SupportedFunctionReturnValue::Ok{ok:None},
1087 http_client_traces: None
1088 },
1089 created_at: finished_at,
1090 backtrace_id: None,
1091 } if *finished_at == sim_clock.now()
1092 );
1093 db_pool.close().await.unwrap();
1094 }
1095
1096 #[tokio::test]
1097 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1098 set_up();
1099 let created_at = Now.now();
1100 let clock_fn = ConstClock(created_at);
1101 let (_guard, db_pool) = Database::Memory.set_up().await;
1102 let exec_config = ExecConfig {
1103 batch_size: 1,
1104 lock_expiry: Duration::from_secs(1),
1105 tick_sleep: Duration::ZERO,
1106 component_id: ComponentId::dummy_activity(),
1107 task_limiter: None,
1108 executor_id: ExecutorId::generate(),
1109 };
1110
1111 let expected_reason = "error reason";
1112 let expected_detail = "error detail";
1113 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1114 WorkerError::ActivityTrap {
1115 reason: expected_reason.to_string(),
1116 trap_kind: concepts::TrapKind::Trap,
1117 detail: Some(expected_detail.to_string()),
1118 version: Version::new(2),
1119 http_client_traces: None,
1120 },
1121 )));
1122 let execution_log = create_and_tick(
1123 CreateAndTickConfig {
1124 execution_id: ExecutionId::generate(),
1125 created_at,
1126 max_retries: 0,
1127 executed_at: created_at,
1128 retry_exp_backoff: Duration::ZERO,
1129 },
1130 clock_fn,
1131 db_pool.clone(),
1132 exec_config.clone(),
1133 worker,
1134 tick_fn,
1135 )
1136 .await;
1137 assert_eq!(3, execution_log.events.len());
1138 let (reason, kind, detail) = assert_matches!(
1139 &execution_log.events.get(2).unwrap(),
1140 ExecutionEvent {
1141 event: ExecutionEventInner::Finished{
1142 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1143 http_client_traces: None
1144 },
1145 created_at: at,
1146 backtrace_id: None,
1147 } if *at == created_at
1148 => (reason_inner, kind, detail)
1149 );
1150 assert_eq!(expected_reason, *reason);
1151 assert_eq!(Some(expected_detail), detail.as_deref());
1152 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1153
1154 db_pool.close().await.unwrap();
1155 }
1156
1157 #[tokio::test]
1158 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1159 let worker_error = WorkerError::ActivityTrap {
1160 reason: "error reason".to_string(),
1161 trap_kind: TrapKind::Trap,
1162 detail: Some("detail".to_string()),
1163 version: Version::new(2),
1164 http_client_traces: None,
1165 };
1166 let expected_child_err = FinishedExecutionError::PermanentFailure {
1167 reason_full: "activity trap: error reason".to_string(),
1168 reason_inner: "error reason".to_string(),
1169 kind: PermanentFailureKind::ActivityTrap,
1170 detail: Some("detail".to_string()),
1171 };
1172 child_execution_permanently_failed_should_notify_parent(
1173 WorkerResult::Err(worker_error),
1174 expected_child_err,
1175 )
1176 .await;
1177 }
1178
1179 #[tokio::test]
1182 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1183 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1184 child_execution_permanently_failed_should_notify_parent(
1185 WorkerResult::DbUpdatedByWorkerOrWatcher,
1186 expected_child_err,
1187 )
1188 .await;
1189 }
1190
1191 async fn child_execution_permanently_failed_should_notify_parent(
1192 worker_result: WorkerResult,
1193 expected_child_err: FinishedExecutionError,
1194 ) {
1195 use concepts::storage::JoinSetResponseEventOuter;
1196 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1197
1198 set_up();
1199 let sim_clock = SimClock::default();
1200 let (_guard, db_pool) = Database::Memory.set_up().await;
1201
1202 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1203 WorkerResult::DbUpdatedByWorkerOrWatcher,
1204 ));
1205 let parent_execution_id = ExecutionId::generate();
1206 db_pool
1207 .connection()
1208 .create(CreateRequest {
1209 created_at: sim_clock.now(),
1210 execution_id: parent_execution_id.clone(),
1211 ffqn: FFQN_SOME,
1212 params: Params::empty(),
1213 parent: None,
1214 metadata: concepts::ExecutionMetadata::empty(),
1215 scheduled_at: sim_clock.now(),
1216 retry_exp_backoff: Duration::ZERO,
1217 max_retries: 0,
1218 component_id: ComponentId::dummy_activity(),
1219 scheduled_by: None,
1220 })
1221 .await
1222 .unwrap();
1223 tick_fn(
1224 ExecConfig {
1225 batch_size: 1,
1226 lock_expiry: LOCK_EXPIRY,
1227 tick_sleep: Duration::ZERO,
1228 component_id: ComponentId::dummy_activity(),
1229 task_limiter: None,
1230 executor_id: ExecutorId::generate(),
1231 },
1232 sim_clock.clone(),
1233 db_pool.clone(),
1234 parent_worker,
1235 sim_clock.now(),
1236 )
1237 .await;
1238
1239 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1240 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1241 {
1243 let child = CreateRequest {
1244 created_at: sim_clock.now(),
1245 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1246 ffqn: FFQN_CHILD,
1247 params: Params::empty(),
1248 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1249 metadata: concepts::ExecutionMetadata::empty(),
1250 scheduled_at: sim_clock.now(),
1251 retry_exp_backoff: Duration::ZERO,
1252 max_retries: 0,
1253 component_id: ComponentId::dummy_activity(),
1254 scheduled_by: None,
1255 };
1256 let current_time = sim_clock.now();
1257 let join_set = AppendRequest {
1258 created_at: current_time,
1259 event: ExecutionEventInner::HistoryEvent {
1260 event: HistoryEvent::JoinSetCreate {
1261 join_set_id: join_set_id.clone(),
1262 closing_strategy: ClosingStrategy::Complete,
1263 },
1264 },
1265 };
1266 let child_exec_req = AppendRequest {
1267 created_at: current_time,
1268 event: ExecutionEventInner::HistoryEvent {
1269 event: HistoryEvent::JoinSetRequest {
1270 join_set_id: join_set_id.clone(),
1271 request: JoinSetRequest::ChildExecutionRequest {
1272 child_execution_id: child_execution_id.clone(),
1273 },
1274 },
1275 },
1276 };
1277 let join_next = AppendRequest {
1278 created_at: current_time,
1279 event: ExecutionEventInner::HistoryEvent {
1280 event: HistoryEvent::JoinNext {
1281 join_set_id: join_set_id.clone(),
1282 run_expires_at: sim_clock.now(),
1283 closing: false,
1284 requested_ffqn: Some(FFQN_CHILD),
1285 },
1286 },
1287 };
1288 db_pool
1289 .connection()
1290 .append_batch_create_new_execution(
1291 current_time,
1292 vec![join_set, child_exec_req, join_next],
1293 parent_execution_id.clone(),
1294 Version::new(2),
1295 vec![child],
1296 )
1297 .await
1298 .unwrap();
1299 }
1300
1301 let child_worker =
1302 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1303
1304 tick_fn(
1306 ExecConfig {
1307 batch_size: 1,
1308 lock_expiry: LOCK_EXPIRY,
1309 tick_sleep: Duration::ZERO,
1310 component_id: ComponentId::dummy_activity(),
1311 task_limiter: None,
1312 executor_id: ExecutorId::generate(),
1313 },
1314 sim_clock.clone(),
1315 db_pool.clone(),
1316 child_worker,
1317 sim_clock.now(),
1318 )
1319 .await;
1320 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1321 sim_clock.move_time_forward(LOCK_EXPIRY);
1323 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1324 .await
1325 .unwrap();
1326 }
1327 let child_log = db_pool
1328 .connection()
1329 .get(&ExecutionId::Derived(child_execution_id.clone()))
1330 .await
1331 .unwrap();
1332 assert!(child_log.pending_state.is_finished());
1333 assert_eq!(
1334 Version(2),
1335 child_log.next_version,
1336 "created = 0, locked = 1, with_single_result = 2"
1337 );
1338 assert_eq!(
1339 ExecutionEventInner::Finished {
1340 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1341 http_client_traces: None
1342 },
1343 child_log.last_event().event
1344 );
1345 let parent_log = db_pool
1346 .connection()
1347 .get(&parent_execution_id)
1348 .await
1349 .unwrap();
1350 assert_matches!(
1351 parent_log.pending_state,
1352 PendingState::PendingAt {
1353 scheduled_at
1354 } if scheduled_at == sim_clock.now(),
1355 "parent should be back to pending"
1356 );
1357 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1358 parent_log.responses.last(),
1359 Some(JoinSetResponseEventOuter{
1360 created_at: at,
1361 event: JoinSetResponseEvent{
1362 join_set_id: found_join_set_id,
1363 event: JoinSetResponse::ChildExecutionFinished {
1364 child_execution_id: found_child_execution_id,
1365 finished_version,
1366 result: found_result,
1367 }
1368 }
1369 })
1370 if *at == sim_clock.now()
1371 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1372 );
1373 assert_eq!(join_set_id, *found_join_set_id);
1374 assert_eq!(child_execution_id, *found_child_execution_id);
1375 assert_eq!(child_log.next_version, *child_finished_version);
1376 assert_matches!(
1377 found_result,
1378 SupportedFunctionReturnValue::ExecutionError(_)
1379 );
1380
1381 db_pool.close().await.unwrap();
1382 }
1383
1384 #[derive(Clone, Debug)]
1385 struct SleepyWorker {
1386 duration: Duration,
1387 result: SupportedFunctionReturnValue,
1388 exported: [FunctionMetadata; 1],
1389 }
1390
1391 #[async_trait]
1392 impl Worker for SleepyWorker {
1393 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1394 tokio::time::sleep(self.duration).await;
1395 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1396 }
1397
1398 fn exported_functions(&self) -> &[FunctionMetadata] {
1399 &self.exported
1400 }
1401 }
1402
1403 #[tokio::test]
1404 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1405 set_up();
1406 let sim_clock = SimClock::default();
1407 let (_guard, db_pool) = Database::Memory.set_up().await;
1408 let lock_expiry = Duration::from_millis(100);
1409 let exec_config = ExecConfig {
1410 batch_size: 1,
1411 lock_expiry,
1412 tick_sleep: Duration::ZERO,
1413 component_id: ComponentId::dummy_activity(),
1414 task_limiter: None,
1415 executor_id: ExecutorId::generate(),
1416 };
1417
1418 let worker = Arc::new(SleepyWorker {
1419 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1421 exported: [FunctionMetadata {
1422 ffqn: FFQN_SOME,
1423 parameter_types: ParameterTypes::default(),
1424 return_type: RETURN_TYPE_DUMMY,
1425 extension: None,
1426 submittable: true,
1427 }],
1428 });
1429 let execution_id = ExecutionId::generate();
1431 let timeout_duration = Duration::from_millis(300);
1432 let db_connection = db_pool.connection();
1433 db_connection
1434 .create(CreateRequest {
1435 created_at: sim_clock.now(),
1436 execution_id: execution_id.clone(),
1437 ffqn: FFQN_SOME,
1438 params: Params::empty(),
1439 parent: None,
1440 metadata: concepts::ExecutionMetadata::empty(),
1441 scheduled_at: sim_clock.now(),
1442 retry_exp_backoff: timeout_duration,
1443 max_retries: 1,
1444 component_id: ComponentId::dummy_activity(),
1445 scheduled_by: None,
1446 })
1447 .await
1448 .unwrap();
1449
1450 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1451 let executor = ExecTask::new(
1452 worker,
1453 exec_config,
1454 sim_clock.clone(),
1455 db_pool.clone(),
1456 ffqns,
1457 );
1458 let mut first_execution_progress = executor
1459 .tick(sim_clock.now(), RunId::generate())
1460 .await
1461 .unwrap();
1462 assert_eq!(1, first_execution_progress.executions.len());
1463 sim_clock.move_time_forward(lock_expiry);
1465 let now_after_first_lock_expiry = sim_clock.now();
1467 {
1468 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1469 let cleanup_progress = executor
1470 .tick(now_after_first_lock_expiry, RunId::generate())
1471 .await
1472 .unwrap();
1473 assert!(cleanup_progress.executions.is_empty());
1474 }
1475 {
1476 let expired_locks = expired_timers_watcher::tick(
1477 db_pool.connection().as_ref(),
1478 now_after_first_lock_expiry,
1479 )
1480 .await
1481 .unwrap()
1482 .expired_locks;
1483 assert_eq!(1, expired_locks);
1484 }
1485 assert!(
1486 !first_execution_progress
1487 .executions
1488 .pop()
1489 .unwrap()
1490 .1
1491 .is_finished()
1492 );
1493
1494 let execution_log = db_connection.get(&execution_id).await.unwrap();
1495 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1496 assert_matches!(
1497 &execution_log.events.get(2).unwrap(),
1498 ExecutionEvent {
1499 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1500 created_at: at,
1501 backtrace_id: None,
1502 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1503 );
1504 assert_eq!(
1505 PendingState::PendingAt {
1506 scheduled_at: expected_first_timeout_expiry
1507 },
1508 execution_log.pending_state
1509 );
1510 sim_clock.move_time_forward(timeout_duration);
1511 let now_after_first_timeout = sim_clock.now();
1512 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1513
1514 let mut second_execution_progress = executor
1515 .tick(now_after_first_timeout, RunId::generate())
1516 .await
1517 .unwrap();
1518 assert_eq!(1, second_execution_progress.executions.len());
1519
1520 sim_clock.move_time_forward(lock_expiry);
1522 let now_after_second_lock_expiry = sim_clock.now();
1524 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1525 {
1526 let cleanup_progress = executor
1527 .tick(now_after_second_lock_expiry, RunId::generate())
1528 .await
1529 .unwrap();
1530 assert!(cleanup_progress.executions.is_empty());
1531 }
1532 {
1533 let expired_locks = expired_timers_watcher::tick(
1534 db_pool.connection().as_ref(),
1535 now_after_second_lock_expiry,
1536 )
1537 .await
1538 .unwrap()
1539 .expired_locks;
1540 assert_eq!(1, expired_locks);
1541 }
1542 assert!(
1543 !second_execution_progress
1544 .executions
1545 .pop()
1546 .unwrap()
1547 .1
1548 .is_finished()
1549 );
1550
1551 drop(db_connection);
1552 drop(executor);
1553 db_pool.close().await.unwrap();
1554 }
1555}