1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendRequest, DbErrorGeneric, DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent,
7 JoinSetResponseEventOuter, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{ComponentId, FunctionMetadata, StrVariant, SupportedFunctionReturnValue};
11use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
12use concepts::{
13 FinishedExecutionError,
14 storage::{ExecutionEventInner, JoinSetResponse, Version},
15};
16use concepts::{JoinSetId, PermanentFailureKind};
17use std::fmt::Display;
18use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25use tokio::task::{AbortHandle, JoinHandle};
26use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
27
28#[derive(Debug, Clone)]
29pub struct ExecConfig {
30 pub lock_expiry: Duration,
31 pub tick_sleep: Duration,
32 pub batch_size: u32,
33 pub component_id: ComponentId,
34 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
35 pub executor_id: ExecutorId,
36}
37
38pub struct ExecTask<C: ClockFn> {
39 worker: Arc<dyn Worker>,
40 config: ExecConfig,
41 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
56 let mut vec = Vec::new();
57 for (exe, join_handle) in self.executions {
58 vec.push(exe);
59 join_handle.await.unwrap();
60 }
61 vec
62 }
63}
64
65#[derive(derive_more::Debug)]
66pub struct ExecutorTaskHandle {
67 #[debug(skip)]
68 is_closing: Arc<AtomicBool>,
69 #[debug(skip)]
70 abort_handle: AbortHandle,
71 component_id: ComponentId,
72 executor_id: ExecutorId,
73}
74
75impl ExecutorTaskHandle {
76 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
77 pub async fn close(&self) {
78 trace!("Gracefully closing");
79 self.is_closing.store(true, Ordering::Relaxed);
80 while !self.abort_handle.is_finished() {
81 tokio::time::sleep(Duration::from_millis(1)).await;
82 }
83 debug!("Gracefully closed");
84 }
85}
86
87impl Drop for ExecutorTaskHandle {
88 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
89 fn drop(&mut self) {
90 if self.abort_handle.is_finished() {
91 return;
92 }
93 warn!("Aborting the executor task");
94 self.abort_handle.abort();
95 }
96}
97
98#[cfg(feature = "test")]
99pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
100 extract_exported_ffqns_noext(worker)
101}
102
103fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
104 worker
105 .exported_functions()
106 .iter()
107 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
108 .collect::<Arc<_>>()
109}
110
111impl<C: ClockFn + 'static> ExecTask<C> {
112 #[cfg(feature = "test")]
113 pub fn new(
114 worker: Arc<dyn Worker>,
115 config: ExecConfig,
116 clock_fn: C,
117 db_exec: Arc<dyn DbExecutor>,
118 ffqns: Arc<[FunctionFqn]>,
119 ) -> Self {
120 Self {
121 worker,
122 config,
123 clock_fn,
124 db_exec,
125 ffqns,
126 }
127 }
128
129 #[cfg(feature = "test")]
130 pub fn new_all_ffqns_test(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_exec: Arc<dyn DbExecutor>,
135 ) -> Self {
136 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
137 Self {
138 worker,
139 config,
140 clock_fn,
141 db_exec,
142 ffqns,
143 }
144 }
145
146 pub fn spawn_new(
147 worker: Arc<dyn Worker>,
148 config: ExecConfig,
149 clock_fn: C,
150 db_exec: Arc<dyn DbExecutor>,
151 sleep: impl Sleep + 'static,
152 ) -> ExecutorTaskHandle {
153 let is_closing = Arc::new(AtomicBool::default());
154 let is_closing_inner = is_closing.clone();
155 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
156 let component_id = config.component_id.clone();
157 let executor_id = config.executor_id;
158 let abort_handle = tokio::spawn(async move {
159 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
160 let task = ExecTask {
161 worker,
162 config,
163 db_exec,
164 ffqns: ffqns.clone(),
165 clock_fn: clock_fn.clone(),
166 };
167 while !is_closing_inner.load(Ordering::Relaxed) {
168 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
169
170 task.db_exec
171 .wait_for_pending(clock_fn.now(), ffqns.clone(), {
172 let sleep = sleep.clone();
173 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
174 .await;
175 }
176 })
177 .abort_handle();
178 ExecutorTaskHandle {
179 is_closing,
180 abort_handle,
181 component_id,
182 executor_id,
183 }
184 }
185
186 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
187 if let Some(task_limiter) = &self.config.task_limiter {
188 let mut locks = Vec::new();
189 for _ in 0..self.config.batch_size {
190 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
191 locks.push(Some(permit));
192 } else {
193 break;
194 }
195 }
196 locks
197 } else {
198 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
199 for _ in 0..self.config.batch_size {
200 vec.push(None);
201 }
202 vec
203 }
204 }
205
206 #[cfg(feature = "test")]
207 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
208 self.tick(executed_at, run_id).await.unwrap()
209 }
210
211 #[cfg(feature = "test")]
212 pub async fn tick_test_await(
213 &self,
214 executed_at: DateTime<Utc>,
215 run_id: RunId,
216 ) -> Vec<ExecutionId> {
217 self.tick(executed_at, run_id)
218 .await
219 .unwrap()
220 .wait_for_tasks()
221 .await
222 }
223
224 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
225 async fn tick(
226 &self,
227 executed_at: DateTime<Utc>,
228 run_id: RunId,
229 ) -> Result<ExecutionProgress, DbErrorGeneric> {
230 let locked_executions = {
231 let mut permits = self.acquire_task_permits();
232 if permits.is_empty() {
233 return Ok(ExecutionProgress::default());
234 }
235 let lock_expires_at = executed_at + self.config.lock_expiry;
236 let locked_executions = self
237 .db_exec
238 .lock_pending(
239 permits.len(), executed_at, self.ffqns.clone(),
242 executed_at, self.config.component_id.clone(),
244 self.config.executor_id,
245 lock_expires_at,
246 run_id,
247 )
248 .await?;
249 while permits.len() > locked_executions.len() {
251 permits.pop();
252 }
253 assert_eq!(permits.len(), locked_executions.len());
254 locked_executions.into_iter().zip(permits)
255 };
256 let execution_deadline = executed_at + self.config.lock_expiry;
257
258 let mut executions = Vec::with_capacity(locked_executions.len());
259 for (locked_execution, permit) in locked_executions {
260 let execution_id = locked_execution.execution_id.clone();
261 let join_handle = {
262 let worker = self.worker.clone();
263 let db = self.db_exec.clone();
264 let clock_fn = self.clock_fn.clone();
265 let run_id = locked_execution.run_id;
266 let worker_span = info_span!(parent: None, "worker",
267 "otel.name" = format!("worker {}", locked_execution.ffqn),
268 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
269 locked_execution.metadata.enrich(&worker_span);
270 tokio::spawn({
271 let worker_span2 = worker_span.clone();
272 async move {
273 let _permit = permit;
274 let res = Self::run_worker(
275 worker,
276 db.as_ref(),
277 execution_deadline,
278 clock_fn,
279 locked_execution,
280 worker_span2,
281 )
282 .await;
283 if let Err(db_error) = res {
284 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
285 }
286 }
287 .instrument(worker_span)
288 })
289 };
290 executions.push((execution_id, join_handle));
291 }
292 Ok(ExecutionProgress { executions })
293 }
294
295 async fn run_worker(
296 worker: Arc<dyn Worker>,
297 db_exec: &dyn DbExecutor,
298 execution_deadline: DateTime<Utc>,
299 clock_fn: C,
300 locked_execution: LockedExecution,
301 worker_span: Span,
302 ) -> Result<(), DbErrorWrite> {
303 debug!("Worker::run starting");
304 trace!(
305 version = %locked_execution.next_version,
306 params = ?locked_execution.params,
307 event_history = ?locked_execution.event_history,
308 "Worker::run starting"
309 );
310 let can_be_retried = ExecutionLog::can_be_retried_after(
311 locked_execution.intermittent_event_count + 1,
312 locked_execution.max_retries,
313 locked_execution.retry_exp_backoff,
314 );
315 let unlock_expiry_on_limit_reached =
316 ExecutionLog::compute_retry_duration_when_retrying_forever(
317 locked_execution.intermittent_event_count + 1,
318 locked_execution.retry_exp_backoff,
319 );
320 let ctx = WorkerContext {
321 execution_id: locked_execution.execution_id.clone(),
322 metadata: locked_execution.metadata,
323 ffqn: locked_execution.ffqn,
324 params: locked_execution.params,
325 event_history: locked_execution.event_history,
326 responses: locked_execution
327 .responses
328 .into_iter()
329 .map(|outer| outer.event)
330 .collect(),
331 version: locked_execution.next_version,
332 execution_deadline,
333 can_be_retried: can_be_retried.is_some(),
334 run_id: locked_execution.run_id,
335 worker_span,
336 };
337 let worker_result = worker.run(ctx).await;
338 trace!(?worker_result, "Worker::run finished");
339 let result_obtained_at = clock_fn.now();
340 match Self::worker_result_to_execution_event(
341 locked_execution.execution_id,
342 worker_result,
343 result_obtained_at,
344 locked_execution.parent,
345 can_be_retried,
346 unlock_expiry_on_limit_reached,
347 )? {
348 Some(append) => {
349 trace!("Appending {append:?}");
350 append.append(db_exec).await
351 }
352 None => Ok(()),
353 }
354 }
355
356 fn worker_result_to_execution_event(
358 execution_id: ExecutionId,
359 worker_result: WorkerResult,
360 result_obtained_at: DateTime<Utc>,
361 parent: Option<(ExecutionId, JoinSetId)>,
362 can_be_retried: Option<Duration>,
363 unlock_expiry_on_limit_reached: Duration,
364 ) -> Result<Option<Append>, DbErrorWrite> {
365 Ok(match worker_result {
366 WorkerResult::Ok(result, new_version, http_client_traces) => {
367 info!(
368 "Execution finished: {}",
369 result.as_pending_state_finished_result()
370 );
371 let child_finished =
372 parent.map(
373 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
374 parent_execution_id,
375 parent_join_set,
376 result: result.clone(),
377 },
378 );
379 let primary_event = AppendRequest {
380 created_at: result_obtained_at,
381 event: ExecutionEventInner::Finished {
382 result,
383 http_client_traces,
384 },
385 };
386
387 Some(Append {
388 created_at: result_obtained_at,
389 primary_event,
390 execution_id,
391 version: new_version,
392 child_finished,
393 })
394 }
395 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
396 WorkerResult::Err(err) => {
397 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
399 WorkerError::TemporaryTimeout {
400 http_client_traces,
401 version,
402 } => {
403 if let Some(duration) = can_be_retried {
404 let backoff_expires_at = result_obtained_at + duration;
405 info!(
406 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
407 );
408 (
409 ExecutionEventInner::TemporarilyTimedOut {
410 backoff_expires_at,
411 http_client_traces,
412 },
413 None,
414 version,
415 )
416 } else {
417 info!("Permanent timeout");
418 let result = SupportedFunctionReturnValue::ExecutionError(
419 FinishedExecutionError::PermanentTimeout,
420 );
421 let child_finished =
422 parent.map(|(parent_execution_id, parent_join_set)| {
423 ChildFinishedResponse {
424 parent_execution_id,
425 parent_join_set,
426 result: result.clone(),
427 }
428 });
429 (
430 ExecutionEventInner::Finished {
431 result,
432 http_client_traces,
433 },
434 child_finished,
435 version,
436 )
437 }
438 }
439 WorkerError::DbError(db_error) => {
440 return Err(db_error);
441 }
442 WorkerError::ActivityTrap {
443 reason: reason_inner,
444 trap_kind,
445 detail,
446 version,
447 http_client_traces,
448 } => retry_or_fail(
449 result_obtained_at,
450 parent,
451 can_be_retried,
452 reason_full,
453 reason_inner,
454 trap_kind, detail,
456 version,
457 http_client_traces,
458 ),
459 WorkerError::ActivityPreopenedDirError {
460 reason_kind,
461 reason_inner,
462 version,
463 } => retry_or_fail(
464 result_obtained_at,
465 parent,
466 can_be_retried,
467 reason_full,
468 reason_inner,
469 reason_kind, None, version,
472 None, ),
474 WorkerError::ActivityReturnedError {
475 detail,
476 version,
477 http_client_traces,
478 } => {
479 let duration = can_be_retried.expect(
480 "ActivityReturnedError must not be returned when retries are exhausted",
481 );
482 let expires_at = result_obtained_at + duration;
483 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
484 (
485 ExecutionEventInner::TemporarilyFailed {
486 backoff_expires_at: expires_at,
487 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
489 detail, http_client_traces,
491 },
492 None,
493 version,
494 )
495 }
496 WorkerError::LimitReached {
497 reason: inner_reason,
498 version: new_version,
499 } => {
500 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
501 warn!(
502 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
503 );
504 (
505 ExecutionEventInner::Unlocked {
506 backoff_expires_at: expires_at,
507 reason: StrVariant::from(reason_full),
508 },
509 None,
510 new_version,
511 )
512 }
513 WorkerError::FatalError(fatal_error, version) => {
514 info!("Fatal worker error - {fatal_error:?}");
515 let result = SupportedFunctionReturnValue::ExecutionError(
516 FinishedExecutionError::from(fatal_error),
517 );
518 let child_finished =
519 parent.map(|(parent_execution_id, parent_join_set)| {
520 ChildFinishedResponse {
521 parent_execution_id,
522 parent_join_set,
523 result: result.clone(),
524 }
525 });
526 (
527 ExecutionEventInner::Finished {
528 result,
529 http_client_traces: None,
530 },
531 child_finished,
532 version,
533 )
534 }
535 };
536 Some(Append {
537 created_at: result_obtained_at,
538 primary_event: AppendRequest {
539 created_at: result_obtained_at,
540 event: primary_event,
541 },
542 execution_id,
543 version,
544 child_finished,
545 })
546 }
547 })
548 }
549}
550
551#[expect(clippy::too_many_arguments)]
552fn retry_or_fail(
553 result_obtained_at: DateTime<Utc>,
554 parent: Option<(ExecutionId, JoinSetId)>,
555 can_be_retried: Option<Duration>,
556 reason_full: String,
557 reason_inner: String,
558 err_kind_for_logs: impl Display,
559 detail: Option<String>,
560 version: Version,
561 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
562) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
563 if let Some(duration) = can_be_retried {
564 let expires_at = result_obtained_at + duration;
565 debug!(
566 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
567 );
568 (
569 ExecutionEventInner::TemporarilyFailed {
570 backoff_expires_at: expires_at,
571 reason_full: StrVariant::from(reason_full),
572 reason_inner: StrVariant::from(reason_inner),
573 detail,
574 http_client_traces,
575 },
576 None,
577 version,
578 )
579 } else {
580 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
581 let result = SupportedFunctionReturnValue::ExecutionError(
582 FinishedExecutionError::PermanentFailure {
583 reason_inner,
584 reason_full,
585 kind: PermanentFailureKind::ActivityTrap,
586 detail,
587 },
588 );
589
590 let child_finished =
591 parent.map(
592 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
593 parent_execution_id,
594 parent_join_set,
595 result: result.clone(),
596 },
597 );
598 (
599 ExecutionEventInner::Finished {
600 result,
601 http_client_traces,
602 },
603 child_finished,
604 version,
605 )
606 }
607}
608
609#[derive(Debug, Clone)]
610pub(crate) struct ChildFinishedResponse {
611 pub(crate) parent_execution_id: ExecutionId,
612 pub(crate) parent_join_set: JoinSetId,
613 pub(crate) result: SupportedFunctionReturnValue,
614}
615
616#[derive(Debug, Clone)]
617pub(crate) struct Append {
618 pub(crate) created_at: DateTime<Utc>,
619 pub(crate) primary_event: AppendRequest,
620 pub(crate) execution_id: ExecutionId,
621 pub(crate) version: Version,
622 pub(crate) child_finished: Option<ChildFinishedResponse>,
623}
624
625impl Append {
626 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
627 if let Some(child_finished) = self.child_finished {
628 assert_matches!(
629 &self.primary_event,
630 AppendRequest {
631 event: ExecutionEventInner::Finished { .. },
632 ..
633 }
634 );
635 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
636 db_exec
637 .append_batch_respond_to_parent(
638 derived.clone(),
639 self.created_at,
640 vec![self.primary_event],
641 self.version.clone(),
642 child_finished.parent_execution_id,
643 JoinSetResponseEventOuter {
644 created_at: self.created_at,
645 event: JoinSetResponseEvent {
646 join_set_id: child_finished.parent_join_set,
647 event: JoinSetResponse::ChildExecutionFinished {
648 child_execution_id: derived,
649 finished_version: self.version,
651 result: child_finished.result,
652 },
653 },
654 },
655 )
656 .await?;
657 } else {
658 db_exec
659 .append(self.execution_id, self.version, self.primary_event)
660 .await?;
661 }
662 Ok(())
663 }
664}
665
666#[cfg(any(test, feature = "test"))]
667pub mod simple_worker {
668 use crate::worker::{Worker, WorkerContext, WorkerResult};
669 use async_trait::async_trait;
670 use concepts::{
671 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
672 storage::{HistoryEvent, Version},
673 };
674 use indexmap::IndexMap;
675 use std::sync::Arc;
676 use tracing::trace;
677
678 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
679 pub type SimpleWorkerResultMap =
680 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
681
682 #[derive(Clone, Debug)]
683 pub struct SimpleWorker {
684 pub worker_results_rev: SimpleWorkerResultMap,
685 pub ffqn: FunctionFqn,
686 exported: [FunctionMetadata; 1],
687 }
688
689 impl SimpleWorker {
690 #[must_use]
691 pub fn with_single_result(res: WorkerResult) -> Self {
692 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
693 Version::new(2),
694 (vec![], res),
695 )]))))
696 }
697
698 #[must_use]
699 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
700 Self {
701 worker_results_rev: self.worker_results_rev,
702 exported: [FunctionMetadata {
703 ffqn: ffqn.clone(),
704 parameter_types: ParameterTypes::default(),
705 return_type: RETURN_TYPE_DUMMY,
706 extension: None,
707 submittable: true,
708 }],
709 ffqn,
710 }
711 }
712
713 #[must_use]
714 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
715 Self {
716 worker_results_rev,
717 ffqn: FFQN_SOME,
718 exported: [FunctionMetadata {
719 ffqn: FFQN_SOME,
720 parameter_types: ParameterTypes::default(),
721 return_type: RETURN_TYPE_DUMMY,
722 extension: None,
723 submittable: true,
724 }],
725 }
726 }
727 }
728
729 #[async_trait]
730 impl Worker for SimpleWorker {
731 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
732 let (expected_version, (expected_eh, worker_result)) =
733 self.worker_results_rev.lock().unwrap().pop().unwrap();
734 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
735 assert_eq!(expected_version, ctx.version);
736 assert_eq!(expected_eh, ctx.event_history);
737 worker_result
738 }
739
740 fn exported_functions(&self) -> &[FunctionMetadata] {
741 &self.exported
742 }
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use self::simple_worker::SimpleWorker;
749 use super::*;
750 use crate::{expired_timers_watcher, worker::WorkerResult};
751 use assert_matches::assert_matches;
752 use async_trait::async_trait;
753 use concepts::storage::DbPoolCloseable;
754 use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
755 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
756 use concepts::time::Now;
757 use concepts::{
758 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
759 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
760 };
761 use db_tests::Database;
762 use indexmap::IndexMap;
763 use simple_worker::FFQN_SOME;
764 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
765 use test_utils::set_up;
766 use test_utils::sim_clock::{ConstClock, SimClock};
767
768 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
769
770 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
771 config: ExecConfig,
772 clock_fn: C,
773 db_exec: Arc<dyn DbExecutor>,
774 worker: Arc<W>,
775 executed_at: DateTime<Utc>,
776 ) -> Vec<ExecutionId> {
777 trace!("Ticking with {worker:?}");
778 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
779 let executor = ExecTask::new(worker, config, clock_fn, db_exec, ffqns);
780 executor
781 .tick_test_await(executed_at, RunId::generate())
782 .await
783 }
784
785 #[tokio::test]
786 async fn execute_simple_lifecycle_tick_based_mem() {
787 let created_at = Now.now();
788 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
789 execute_simple_lifecycle_tick_based(
790 db_pool.connection().as_ref(),
791 db_exec.clone(),
792 ConstClock(created_at),
793 )
794 .await;
795 db_close.close().await;
796 }
797
798 #[tokio::test]
799 async fn execute_simple_lifecycle_tick_based_sqlite() {
800 let created_at = Now.now();
801 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
802 execute_simple_lifecycle_tick_based(
803 db_pool.connection().as_ref(),
804 db_exec.clone(),
805 ConstClock(created_at),
806 )
807 .await;
808 db_close.close().await;
809 }
810
811 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
812 db_connection: &dyn DbConnection,
813 db_exec: Arc<dyn DbExecutor>,
814 clock_fn: C,
815 ) {
816 set_up();
817 let created_at = clock_fn.now();
818 let exec_config = ExecConfig {
819 batch_size: 1,
820 lock_expiry: Duration::from_secs(1),
821 tick_sleep: Duration::from_millis(100),
822 component_id: ComponentId::dummy_activity(),
823 task_limiter: None,
824 executor_id: ExecutorId::generate(),
825 };
826
827 let execution_log = create_and_tick(
828 CreateAndTickConfig {
829 execution_id: ExecutionId::generate(),
830 created_at,
831 max_retries: 0,
832 executed_at: created_at,
833 retry_exp_backoff: Duration::ZERO,
834 },
835 clock_fn,
836 db_connection,
837 db_exec,
838 exec_config,
839 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
840 SUPPORTED_RETURN_VALUE_OK_EMPTY,
841 Version::new(2),
842 None,
843 ))),
844 tick_fn,
845 )
846 .await;
847 assert_matches!(
848 execution_log.events.get(2).unwrap(),
849 ExecutionEvent {
850 event: ExecutionEventInner::Finished {
851 result: SupportedFunctionReturnValue::Ok { ok: None },
852 http_client_traces: None
853 },
854 created_at: _,
855 backtrace_id: None,
856 }
857 );
858 }
859
860 #[tokio::test]
861 async fn execute_simple_lifecycle_task_based_mem() {
862 set_up();
863 let created_at = Now.now();
864 let clock_fn = ConstClock(created_at);
865 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
866 let exec_config = ExecConfig {
867 batch_size: 1,
868 lock_expiry: Duration::from_secs(1),
869 tick_sleep: Duration::ZERO,
870 component_id: ComponentId::dummy_activity(),
871 task_limiter: None,
872 executor_id: ExecutorId::generate(),
873 };
874
875 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
876 SUPPORTED_RETURN_VALUE_OK_EMPTY,
877 Version::new(2),
878 None,
879 )));
880
881 let execution_log = create_and_tick(
882 CreateAndTickConfig {
883 execution_id: ExecutionId::generate(),
884 created_at,
885 max_retries: 0,
886 executed_at: created_at,
887 retry_exp_backoff: Duration::ZERO,
888 },
889 clock_fn,
890 db_pool.connection().as_ref(),
891 db_exec,
892 exec_config,
893 worker,
894 tick_fn,
895 )
896 .await;
897 assert_matches!(
898 execution_log.events.get(2).unwrap(),
899 ExecutionEvent {
900 event: ExecutionEventInner::Finished {
901 result: SupportedFunctionReturnValue::Ok { ok: None },
902 http_client_traces: None
903 },
904 created_at: _,
905 backtrace_id: None,
906 }
907 );
908 db_close.close().await;
909 }
910
911 struct CreateAndTickConfig {
912 execution_id: ExecutionId,
913 created_at: DateTime<Utc>,
914 max_retries: u32,
915 executed_at: DateTime<Utc>,
916 retry_exp_backoff: Duration,
917 }
918
919 async fn create_and_tick<
920 W: Worker,
921 C: ClockFn,
922 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
923 F: Future<Output = Vec<ExecutionId>>,
924 >(
925 config: CreateAndTickConfig,
926 clock_fn: C,
927 db_connection: &dyn DbConnection,
928 db_exec: Arc<dyn DbExecutor>,
929 exec_config: ExecConfig,
930 worker: Arc<W>,
931 mut tick: T,
932 ) -> ExecutionLog {
933 db_connection
935 .create(CreateRequest {
936 created_at: config.created_at,
937 execution_id: config.execution_id.clone(),
938 ffqn: FFQN_SOME,
939 params: Params::empty(),
940 parent: None,
941 metadata: concepts::ExecutionMetadata::empty(),
942 scheduled_at: config.created_at,
943 retry_exp_backoff: config.retry_exp_backoff,
944 max_retries: config.max_retries,
945 component_id: ComponentId::dummy_activity(),
946 scheduled_by: None,
947 })
948 .await
949 .unwrap();
950 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
952 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
953 debug!("Execution history after tick: {execution_log:?}");
954 assert_matches!(
956 execution_log.events.first().unwrap(),
957 ExecutionEvent {
958 event: ExecutionEventInner::Created { .. },
959 created_at: actually_created_at,
960 backtrace_id: None,
961 }
962 if config.created_at == *actually_created_at
963 );
964 let locked_at = assert_matches!(
965 execution_log.events.get(1).unwrap(),
966 ExecutionEvent {
967 event: ExecutionEventInner::Locked { .. },
968 created_at: locked_at,
969 backtrace_id: None,
970 } if config.created_at <= *locked_at
971 => *locked_at
972 );
973 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
974 event: _,
975 created_at: executed_at,
976 backtrace_id: None,
977 } if *executed_at >= locked_at);
978 execution_log
979 }
980
981 #[tokio::test]
982 async fn activity_trap_should_trigger_an_execution_retry() {
983 set_up();
984 let sim_clock = SimClock::default();
985 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
986 let exec_config = ExecConfig {
987 batch_size: 1,
988 lock_expiry: Duration::from_secs(1),
989 tick_sleep: Duration::ZERO,
990 component_id: ComponentId::dummy_activity(),
991 task_limiter: None,
992 executor_id: ExecutorId::generate(),
993 };
994 let expected_reason = "error reason";
995 let expected_detail = "error detail";
996 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
997 WorkerError::ActivityTrap {
998 reason: expected_reason.to_string(),
999 trap_kind: concepts::TrapKind::Trap,
1000 detail: Some(expected_detail.to_string()),
1001 version: Version::new(2),
1002 http_client_traces: None,
1003 },
1004 )));
1005 let retry_exp_backoff = Duration::from_millis(100);
1006 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1007 let execution_log = create_and_tick(
1008 CreateAndTickConfig {
1009 execution_id: ExecutionId::generate(),
1010 created_at: sim_clock.now(),
1011 max_retries: 1,
1012 executed_at: sim_clock.now(),
1013 retry_exp_backoff,
1014 },
1015 sim_clock.clone(),
1016 db_pool.connection().as_ref(),
1017 db_exec.clone(),
1018 exec_config.clone(),
1019 worker,
1020 tick_fn,
1021 )
1022 .await;
1023 assert_eq!(3, execution_log.events.len());
1024 {
1025 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1026 &execution_log.events.get(2).unwrap(),
1027 ExecutionEvent {
1028 event: ExecutionEventInner::TemporarilyFailed {
1029 reason_inner,
1030 reason_full,
1031 detail,
1032 backoff_expires_at,
1033 http_client_traces: None,
1034 },
1035 created_at: at,
1036 backtrace_id: None,
1037 }
1038 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1039 );
1040 assert_eq!(expected_reason, reason_inner.deref());
1041 assert_eq!(
1042 format!("activity trap: {expected_reason}"),
1043 reason_full.deref()
1044 );
1045 assert_eq!(Some(expected_detail), detail.as_deref());
1046 assert_eq!(at, sim_clock.now());
1047 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1048 }
1049 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1050 std::sync::Mutex::new(IndexMap::from([(
1051 Version::new(4),
1052 (
1053 vec![],
1054 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1055 ),
1056 )])),
1057 )));
1058 assert!(
1060 tick_fn(
1061 exec_config.clone(),
1062 sim_clock.clone(),
1063 db_exec.clone(),
1064 worker.clone(),
1065 sim_clock.now(),
1066 )
1067 .await
1068 .is_empty()
1069 );
1070 sim_clock.move_time_forward(retry_exp_backoff);
1072 tick_fn(
1073 exec_config,
1074 sim_clock.clone(),
1075 db_exec.clone(),
1076 worker,
1077 sim_clock.now(),
1078 )
1079 .await;
1080 let execution_log = {
1081 let db_connection = db_pool.connection();
1082 db_connection
1083 .get(&execution_log.execution_id)
1084 .await
1085 .unwrap()
1086 };
1087 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1088 assert_matches!(
1089 execution_log.events.get(3).unwrap(),
1090 ExecutionEvent {
1091 event: ExecutionEventInner::Locked { .. },
1092 created_at: at,
1093 backtrace_id: None,
1094 } if *at == sim_clock.now()
1095 );
1096 assert_matches!(
1097 execution_log.events.get(4).unwrap(),
1098 ExecutionEvent {
1099 event: ExecutionEventInner::Finished {
1100 result: SupportedFunctionReturnValue::Ok{ok:None},
1101 http_client_traces: None
1102 },
1103 created_at: finished_at,
1104 backtrace_id: None,
1105 } if *finished_at == sim_clock.now()
1106 );
1107 db_close.close().await;
1108 }
1109
1110 #[tokio::test]
1111 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1112 set_up();
1113 let created_at = Now.now();
1114 let clock_fn = ConstClock(created_at);
1115 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1116 let exec_config = ExecConfig {
1117 batch_size: 1,
1118 lock_expiry: Duration::from_secs(1),
1119 tick_sleep: Duration::ZERO,
1120 component_id: ComponentId::dummy_activity(),
1121 task_limiter: None,
1122 executor_id: ExecutorId::generate(),
1123 };
1124
1125 let expected_reason = "error reason";
1126 let expected_detail = "error detail";
1127 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1128 WorkerError::ActivityTrap {
1129 reason: expected_reason.to_string(),
1130 trap_kind: concepts::TrapKind::Trap,
1131 detail: Some(expected_detail.to_string()),
1132 version: Version::new(2),
1133 http_client_traces: None,
1134 },
1135 )));
1136 let execution_log = create_and_tick(
1137 CreateAndTickConfig {
1138 execution_id: ExecutionId::generate(),
1139 created_at,
1140 max_retries: 0,
1141 executed_at: created_at,
1142 retry_exp_backoff: Duration::ZERO,
1143 },
1144 clock_fn,
1145 db_pool.connection().as_ref(),
1146 db_exec,
1147 exec_config.clone(),
1148 worker,
1149 tick_fn,
1150 )
1151 .await;
1152 assert_eq!(3, execution_log.events.len());
1153 let (reason, kind, detail) = assert_matches!(
1154 &execution_log.events.get(2).unwrap(),
1155 ExecutionEvent {
1156 event: ExecutionEventInner::Finished{
1157 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1158 http_client_traces: None
1159 },
1160 created_at: at,
1161 backtrace_id: None,
1162 } if *at == created_at
1163 => (reason_inner, kind, detail)
1164 );
1165 assert_eq!(expected_reason, *reason);
1166 assert_eq!(Some(expected_detail), detail.as_deref());
1167 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1168
1169 db_close.close().await;
1170 }
1171
1172 #[tokio::test]
1173 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1174 let worker_error = WorkerError::ActivityTrap {
1175 reason: "error reason".to_string(),
1176 trap_kind: TrapKind::Trap,
1177 detail: Some("detail".to_string()),
1178 version: Version::new(2),
1179 http_client_traces: None,
1180 };
1181 let expected_child_err = FinishedExecutionError::PermanentFailure {
1182 reason_full: "activity trap: error reason".to_string(),
1183 reason_inner: "error reason".to_string(),
1184 kind: PermanentFailureKind::ActivityTrap,
1185 detail: Some("detail".to_string()),
1186 };
1187 child_execution_permanently_failed_should_notify_parent(
1188 WorkerResult::Err(worker_error),
1189 expected_child_err,
1190 )
1191 .await;
1192 }
1193
1194 #[tokio::test]
1197 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1198 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1199 child_execution_permanently_failed_should_notify_parent(
1200 WorkerResult::DbUpdatedByWorkerOrWatcher,
1201 expected_child_err,
1202 )
1203 .await;
1204 }
1205
1206 async fn child_execution_permanently_failed_should_notify_parent(
1207 worker_result: WorkerResult,
1208 expected_child_err: FinishedExecutionError,
1209 ) {
1210 use concepts::storage::JoinSetResponseEventOuter;
1211 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1212
1213 set_up();
1214 let sim_clock = SimClock::default();
1215 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1216
1217 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1218 WorkerResult::DbUpdatedByWorkerOrWatcher,
1219 ));
1220 let parent_execution_id = ExecutionId::generate();
1221 db_pool
1222 .connection()
1223 .create(CreateRequest {
1224 created_at: sim_clock.now(),
1225 execution_id: parent_execution_id.clone(),
1226 ffqn: FFQN_SOME,
1227 params: Params::empty(),
1228 parent: None,
1229 metadata: concepts::ExecutionMetadata::empty(),
1230 scheduled_at: sim_clock.now(),
1231 retry_exp_backoff: Duration::ZERO,
1232 max_retries: 0,
1233 component_id: ComponentId::dummy_activity(),
1234 scheduled_by: None,
1235 })
1236 .await
1237 .unwrap();
1238 tick_fn(
1239 ExecConfig {
1240 batch_size: 1,
1241 lock_expiry: LOCK_EXPIRY,
1242 tick_sleep: Duration::ZERO,
1243 component_id: ComponentId::dummy_activity(),
1244 task_limiter: None,
1245 executor_id: ExecutorId::generate(),
1246 },
1247 sim_clock.clone(),
1248 db_exec.clone(),
1249 parent_worker,
1250 sim_clock.now(),
1251 )
1252 .await;
1253
1254 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1255 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1256 {
1258 let child = CreateRequest {
1259 created_at: sim_clock.now(),
1260 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1261 ffqn: FFQN_CHILD,
1262 params: Params::empty(),
1263 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1264 metadata: concepts::ExecutionMetadata::empty(),
1265 scheduled_at: sim_clock.now(),
1266 retry_exp_backoff: Duration::ZERO,
1267 max_retries: 0,
1268 component_id: ComponentId::dummy_activity(),
1269 scheduled_by: None,
1270 };
1271 let current_time = sim_clock.now();
1272 let join_set = AppendRequest {
1273 created_at: current_time,
1274 event: ExecutionEventInner::HistoryEvent {
1275 event: HistoryEvent::JoinSetCreate {
1276 join_set_id: join_set_id.clone(),
1277 closing_strategy: ClosingStrategy::Complete,
1278 },
1279 },
1280 };
1281 let child_exec_req = AppendRequest {
1282 created_at: current_time,
1283 event: ExecutionEventInner::HistoryEvent {
1284 event: HistoryEvent::JoinSetRequest {
1285 join_set_id: join_set_id.clone(),
1286 request: JoinSetRequest::ChildExecutionRequest {
1287 child_execution_id: child_execution_id.clone(),
1288 },
1289 },
1290 },
1291 };
1292 let join_next = AppendRequest {
1293 created_at: current_time,
1294 event: ExecutionEventInner::HistoryEvent {
1295 event: HistoryEvent::JoinNext {
1296 join_set_id: join_set_id.clone(),
1297 run_expires_at: sim_clock.now(),
1298 closing: false,
1299 requested_ffqn: Some(FFQN_CHILD),
1300 },
1301 },
1302 };
1303 db_pool
1304 .connection()
1305 .append_batch_create_new_execution(
1306 current_time,
1307 vec![join_set, child_exec_req, join_next],
1308 parent_execution_id.clone(),
1309 Version::new(2),
1310 vec![child],
1311 )
1312 .await
1313 .unwrap();
1314 }
1315
1316 let child_worker =
1317 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1318
1319 tick_fn(
1321 ExecConfig {
1322 batch_size: 1,
1323 lock_expiry: LOCK_EXPIRY,
1324 tick_sleep: Duration::ZERO,
1325 component_id: ComponentId::dummy_activity(),
1326 task_limiter: None,
1327 executor_id: ExecutorId::generate(),
1328 },
1329 sim_clock.clone(),
1330 db_exec.clone(),
1331 child_worker,
1332 sim_clock.now(),
1333 )
1334 .await;
1335 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1336 sim_clock.move_time_forward(LOCK_EXPIRY);
1338 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1339 .await
1340 .unwrap();
1341 }
1342 let child_log = db_pool
1343 .connection()
1344 .get(&ExecutionId::Derived(child_execution_id.clone()))
1345 .await
1346 .unwrap();
1347 assert!(child_log.pending_state.is_finished());
1348 assert_eq!(
1349 Version(2),
1350 child_log.next_version,
1351 "created = 0, locked = 1, with_single_result = 2"
1352 );
1353 assert_eq!(
1354 ExecutionEventInner::Finished {
1355 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1356 http_client_traces: None
1357 },
1358 child_log.last_event().event
1359 );
1360 let parent_log = db_pool
1361 .connection()
1362 .get(&parent_execution_id)
1363 .await
1364 .unwrap();
1365 assert_matches!(
1366 parent_log.pending_state,
1367 PendingState::PendingAt {
1368 scheduled_at
1369 } if scheduled_at == sim_clock.now(),
1370 "parent should be back to pending"
1371 );
1372 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1373 parent_log.responses.last(),
1374 Some(JoinSetResponseEventOuter{
1375 created_at: at,
1376 event: JoinSetResponseEvent{
1377 join_set_id: found_join_set_id,
1378 event: JoinSetResponse::ChildExecutionFinished {
1379 child_execution_id: found_child_execution_id,
1380 finished_version,
1381 result: found_result,
1382 }
1383 }
1384 })
1385 if *at == sim_clock.now()
1386 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1387 );
1388 assert_eq!(join_set_id, *found_join_set_id);
1389 assert_eq!(child_execution_id, *found_child_execution_id);
1390 assert_eq!(child_log.next_version, *child_finished_version);
1391 assert_matches!(
1392 found_result,
1393 SupportedFunctionReturnValue::ExecutionError(_)
1394 );
1395
1396 db_close.close().await;
1397 }
1398
1399 #[derive(Clone, Debug)]
1400 struct SleepyWorker {
1401 duration: Duration,
1402 result: SupportedFunctionReturnValue,
1403 exported: [FunctionMetadata; 1],
1404 }
1405
1406 #[async_trait]
1407 impl Worker for SleepyWorker {
1408 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1409 tokio::time::sleep(self.duration).await;
1410 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1411 }
1412
1413 fn exported_functions(&self) -> &[FunctionMetadata] {
1414 &self.exported
1415 }
1416 }
1417
1418 #[tokio::test]
1419 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1420 set_up();
1421 let sim_clock = SimClock::default();
1422 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1423 let lock_expiry = Duration::from_millis(100);
1424 let exec_config = ExecConfig {
1425 batch_size: 1,
1426 lock_expiry,
1427 tick_sleep: Duration::ZERO,
1428 component_id: ComponentId::dummy_activity(),
1429 task_limiter: None,
1430 executor_id: ExecutorId::generate(),
1431 };
1432
1433 let worker = Arc::new(SleepyWorker {
1434 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1436 exported: [FunctionMetadata {
1437 ffqn: FFQN_SOME,
1438 parameter_types: ParameterTypes::default(),
1439 return_type: RETURN_TYPE_DUMMY,
1440 extension: None,
1441 submittable: true,
1442 }],
1443 });
1444 let execution_id = ExecutionId::generate();
1446 let timeout_duration = Duration::from_millis(300);
1447 let db_connection = db_pool.connection();
1448 db_connection
1449 .create(CreateRequest {
1450 created_at: sim_clock.now(),
1451 execution_id: execution_id.clone(),
1452 ffqn: FFQN_SOME,
1453 params: Params::empty(),
1454 parent: None,
1455 metadata: concepts::ExecutionMetadata::empty(),
1456 scheduled_at: sim_clock.now(),
1457 retry_exp_backoff: timeout_duration,
1458 max_retries: 1,
1459 component_id: ComponentId::dummy_activity(),
1460 scheduled_by: None,
1461 })
1462 .await
1463 .unwrap();
1464
1465 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1466 let executor = ExecTask::new(
1467 worker,
1468 exec_config,
1469 sim_clock.clone(),
1470 db_exec.clone(),
1471 ffqns,
1472 );
1473 let mut first_execution_progress = executor
1474 .tick(sim_clock.now(), RunId::generate())
1475 .await
1476 .unwrap();
1477 assert_eq!(1, first_execution_progress.executions.len());
1478 sim_clock.move_time_forward(lock_expiry);
1480 let now_after_first_lock_expiry = sim_clock.now();
1482 {
1483 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1484 let cleanup_progress = executor
1485 .tick(now_after_first_lock_expiry, RunId::generate())
1486 .await
1487 .unwrap();
1488 assert!(cleanup_progress.executions.is_empty());
1489 }
1490 {
1491 let expired_locks = expired_timers_watcher::tick(
1492 db_pool.connection().as_ref(),
1493 now_after_first_lock_expiry,
1494 )
1495 .await
1496 .unwrap()
1497 .expired_locks;
1498 assert_eq!(1, expired_locks);
1499 }
1500 assert!(
1501 !first_execution_progress
1502 .executions
1503 .pop()
1504 .unwrap()
1505 .1
1506 .is_finished()
1507 );
1508
1509 let execution_log = db_connection.get(&execution_id).await.unwrap();
1510 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1511 assert_matches!(
1512 &execution_log.events.get(2).unwrap(),
1513 ExecutionEvent {
1514 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1515 created_at: at,
1516 backtrace_id: None,
1517 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1518 );
1519 assert_eq!(
1520 PendingState::PendingAt {
1521 scheduled_at: expected_first_timeout_expiry
1522 },
1523 execution_log.pending_state
1524 );
1525 sim_clock.move_time_forward(timeout_duration);
1526 let now_after_first_timeout = sim_clock.now();
1527 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1528
1529 let mut second_execution_progress = executor
1530 .tick(now_after_first_timeout, RunId::generate())
1531 .await
1532 .unwrap();
1533 assert_eq!(1, second_execution_progress.executions.len());
1534
1535 sim_clock.move_time_forward(lock_expiry);
1537 let now_after_second_lock_expiry = sim_clock.now();
1539 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1540 {
1541 let cleanup_progress = executor
1542 .tick(now_after_second_lock_expiry, RunId::generate())
1543 .await
1544 .unwrap();
1545 assert!(cleanup_progress.executions.is_empty());
1546 }
1547 {
1548 let expired_locks = expired_timers_watcher::tick(
1549 db_pool.connection().as_ref(),
1550 now_after_second_lock_expiry,
1551 )
1552 .await
1553 .unwrap()
1554 .expired_locks;
1555 assert_eq!(1, expired_locks);
1556 }
1557 assert!(
1558 !second_execution_progress
1559 .executions
1560 .pop()
1561 .unwrap()
1562 .1
1563 .is_finished()
1564 );
1565
1566 drop(db_connection);
1567 drop(executor);
1568 db_close.close().await;
1569 }
1570}