1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
8 LockedExecution,
9};
10use concepts::time::{ClockFn, Sleep};
11use concepts::{
12 ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
13};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16 FinishedExecutionError,
17 storage::{ExecutionEventInner, JoinSetResponse, Version},
18};
19use concepts::{JoinSetId, PermanentFailureKind};
20use std::fmt::Display;
21use std::{
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26 time::Duration,
27};
28use tokio::task::{AbortHandle, JoinHandle};
29use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
30
31#[derive(Debug, Clone)]
32pub struct ExecConfig {
33 pub lock_expiry: Duration,
34 pub tick_sleep: Duration,
35 pub batch_size: u32,
36 pub component_id: ComponentId,
37 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
38 pub executor_id: ExecutorId,
39 pub retry_config: ComponentRetryConfig,
40}
41
42pub struct ExecTask<C: ClockFn> {
43 worker: Arc<dyn Worker>,
44 config: ExecConfig,
45 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
47 ffqns: Arc<[FunctionFqn]>,
48}
49
50#[derive(derive_more::Debug, Default)]
51pub struct ExecutionProgress {
52 #[debug(skip)]
53 #[allow(dead_code)]
54 executions: Vec<(ExecutionId, JoinHandle<()>)>,
55}
56
57impl ExecutionProgress {
58 #[cfg(feature = "test")]
59 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
60 let mut vec = Vec::new();
61 for (exe, join_handle) in self.executions {
62 vec.push(exe);
63 join_handle.await.unwrap();
64 }
65 vec
66 }
67}
68
69#[derive(derive_more::Debug)]
70pub struct ExecutorTaskHandle {
71 #[debug(skip)]
72 is_closing: Arc<AtomicBool>,
73 #[debug(skip)]
74 abort_handle: AbortHandle,
75 component_id: ComponentId,
76 executor_id: ExecutorId,
77}
78
79impl ExecutorTaskHandle {
80 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
81 pub async fn close(&self) {
82 trace!("Gracefully closing");
83 self.is_closing.store(true, Ordering::Relaxed);
91 while !self.abort_handle.is_finished() {
92 tokio::time::sleep(Duration::from_millis(1)).await;
93 }
94 debug!("Gracefully closed");
95 }
96}
97
98impl Drop for ExecutorTaskHandle {
99 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
100 fn drop(&mut self) {
101 if self.abort_handle.is_finished() {
102 return;
103 }
104 warn!("Aborting the executor task");
105 self.abort_handle.abort();
106 }
107}
108
109#[cfg(feature = "test")]
110pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
111 extract_exported_ffqns_noext(worker)
112}
113
114fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
115 worker
116 .exported_functions()
117 .iter()
118 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
119 .collect::<Arc<_>>()
120}
121
122impl<C: ClockFn + 'static> ExecTask<C> {
123 #[cfg(feature = "test")]
124 pub fn new_test(
125 worker: Arc<dyn Worker>,
126 config: ExecConfig,
127 clock_fn: C,
128 db_exec: Arc<dyn DbExecutor>,
129 ffqns: Arc<[FunctionFqn]>,
130 ) -> Self {
131 Self {
132 worker,
133 config,
134 clock_fn,
135 db_exec,
136 ffqns,
137 }
138 }
139
140 #[cfg(feature = "test")]
141 pub fn new_all_ffqns_test(
142 worker: Arc<dyn Worker>,
143 config: ExecConfig,
144 clock_fn: C,
145 db_exec: Arc<dyn DbExecutor>,
146 ) -> Self {
147 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
148 Self {
149 worker,
150 config,
151 clock_fn,
152 db_exec,
153 ffqns,
154 }
155 }
156
157 pub fn spawn_new(
158 worker: Arc<dyn Worker>,
159 config: ExecConfig,
160 clock_fn: C,
161 db_exec: Arc<dyn DbExecutor>,
162 sleep: impl Sleep + 'static,
163 ) -> ExecutorTaskHandle {
164 let is_closing = Arc::new(AtomicBool::default());
165 let is_closing_inner = is_closing.clone();
166 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167 let component_id = config.component_id.clone();
168 let executor_id = config.executor_id;
169 let abort_handle = tokio::spawn(async move {
170 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
171 let task = ExecTask {
172 worker,
173 config,
174 db_exec,
175 ffqns: ffqns.clone(),
176 clock_fn: clock_fn.clone(),
177 };
178 while !is_closing_inner.load(Ordering::Relaxed) {
179 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
180
181 task.db_exec
182 .wait_for_pending(clock_fn.now(), ffqns.clone(), {
183 let sleep = sleep.clone();
184 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
185 .await;
186 }
187 })
188 .abort_handle();
189 ExecutorTaskHandle {
190 is_closing,
191 abort_handle,
192 component_id,
193 executor_id,
194 }
195 }
196
197 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
198 if let Some(task_limiter) = &self.config.task_limiter {
199 let mut locks = Vec::new();
200 for _ in 0..self.config.batch_size {
201 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
202 locks.push(Some(permit));
203 } else {
204 break;
205 }
206 }
207 locks
208 } else {
209 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
210 for _ in 0..self.config.batch_size {
211 vec.push(None);
212 }
213 vec
214 }
215 }
216
217 #[cfg(feature = "test")]
218 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
219 self.tick(executed_at, run_id).await.unwrap()
220 }
221
222 #[cfg(feature = "test")]
223 pub async fn tick_test_await(
224 &self,
225 executed_at: DateTime<Utc>,
226 run_id: RunId,
227 ) -> Vec<ExecutionId> {
228 self.tick(executed_at, run_id)
229 .await
230 .unwrap()
231 .wait_for_tasks()
232 .await
233 }
234
235 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
236 async fn tick(
237 &self,
238 executed_at: DateTime<Utc>,
239 run_id: RunId,
240 ) -> Result<ExecutionProgress, DbErrorGeneric> {
241 let locked_executions = {
242 let mut permits = self.acquire_task_permits();
243 if permits.is_empty() {
244 return Ok(ExecutionProgress::default());
245 }
246 let lock_expires_at = executed_at + self.config.lock_expiry;
247 let locked_executions = self
248 .db_exec
249 .lock_pending(
250 permits.len(), executed_at, self.ffqns.clone(),
253 executed_at, self.config.component_id.clone(),
255 self.config.executor_id,
256 lock_expires_at,
257 run_id,
258 self.config.retry_config,
259 )
260 .await?;
261 while permits.len() > locked_executions.len() {
263 permits.pop();
264 }
265 assert_eq!(permits.len(), locked_executions.len());
266 locked_executions.into_iter().zip(permits)
267 };
268
269 let mut executions = Vec::with_capacity(locked_executions.len());
270 for (locked_execution, permit) in locked_executions {
271 let execution_id = locked_execution.execution_id.clone();
272 let join_handle = {
273 let worker = self.worker.clone();
274 let db = self.db_exec.clone();
275 let clock_fn = self.clock_fn.clone();
276 let worker_span = info_span!(parent: None, "worker",
277 "otel.name" = format!("worker {}", locked_execution.ffqn),
278 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
279 locked_execution.metadata.enrich(&worker_span);
280 tokio::spawn({
281 let worker_span2 = worker_span.clone();
282 let retry_config = self.config.retry_config;
283 async move {
284 let _permit = permit;
285 let res = Self::run_worker(
286 worker,
287 db.as_ref(),
288 clock_fn,
289 locked_execution,
290 retry_config,
291 worker_span2,
292 )
293 .await;
294 if let Err(db_error) = res {
295 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
296 }
297 }
298 .instrument(worker_span)
299 })
300 };
301 executions.push((execution_id, join_handle));
302 }
303 Ok(ExecutionProgress { executions })
304 }
305
306 async fn run_worker(
307 worker: Arc<dyn Worker>,
308 db_exec: &dyn DbExecutor,
309 clock_fn: C,
310 locked_execution: LockedExecution,
311 retry_config: ComponentRetryConfig,
312 worker_span: Span,
313 ) -> Result<(), DbErrorWrite> {
314 debug!("Worker::run starting");
315 trace!(
316 version = %locked_execution.next_version,
317 params = ?locked_execution.params,
318 event_history = ?locked_execution.event_history,
319 "Worker::run starting"
320 );
321 let can_be_retried = ExecutionLog::can_be_retried_after(
322 locked_execution.intermittent_event_count + 1,
323 retry_config.max_retries,
324 retry_config.retry_exp_backoff,
325 );
326 let unlock_expiry_on_limit_reached =
327 ExecutionLog::compute_retry_duration_when_retrying_forever(
328 locked_execution.intermittent_event_count + 1,
329 retry_config.retry_exp_backoff,
330 );
331 let ctx = WorkerContext {
332 execution_id: locked_execution.execution_id.clone(),
333 metadata: locked_execution.metadata,
334 ffqn: locked_execution.ffqn,
335 params: locked_execution.params,
336 event_history: locked_execution.event_history,
337 responses: locked_execution
338 .responses
339 .into_iter()
340 .map(|outer| outer.event)
341 .collect(),
342 version: locked_execution.next_version,
343 can_be_retried: can_be_retried.is_some(),
344 locked_event: locked_execution.locked_event,
345 worker_span,
346 };
347 let worker_result = worker.run(ctx).await;
348 trace!(?worker_result, "Worker::run finished");
349 let result_obtained_at = clock_fn.now();
350 match Self::worker_result_to_execution_event(
351 locked_execution.execution_id,
352 worker_result,
353 result_obtained_at,
354 locked_execution.parent,
355 can_be_retried,
356 unlock_expiry_on_limit_reached,
357 )? {
358 Some(append) => {
359 trace!("Appending {append:?}");
360 append.append(db_exec).await
361 }
362 None => Ok(()),
363 }
364 }
365
366 fn worker_result_to_execution_event(
368 execution_id: ExecutionId,
369 worker_result: WorkerResult,
370 result_obtained_at: DateTime<Utc>,
371 parent: Option<(ExecutionId, JoinSetId)>,
372 can_be_retried: Option<Duration>,
373 unlock_expiry_on_limit_reached: Duration,
374 ) -> Result<Option<Append>, DbErrorWrite> {
375 Ok(match worker_result {
376 WorkerResult::Ok(result, new_version, http_client_traces) => {
377 info!(
378 "Execution finished: {}",
379 result.as_pending_state_finished_result()
380 );
381 let child_finished =
382 parent.map(
383 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
384 parent_execution_id,
385 parent_join_set,
386 result: result.clone(),
387 },
388 );
389 let primary_event = AppendRequest {
390 created_at: result_obtained_at,
391 event: ExecutionEventInner::Finished {
392 result,
393 http_client_traces,
394 },
395 };
396
397 Some(Append {
398 created_at: result_obtained_at,
399 primary_event,
400 execution_id,
401 version: new_version,
402 child_finished,
403 })
404 }
405 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
406 WorkerResult::Err(err) => {
407 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
409 WorkerError::TemporaryTimeout {
410 http_client_traces,
411 version,
412 } => {
413 if let Some(duration) = can_be_retried {
414 let backoff_expires_at = result_obtained_at + duration;
415 info!(
416 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
417 );
418 (
419 ExecutionEventInner::TemporarilyTimedOut {
420 backoff_expires_at,
421 http_client_traces,
422 },
423 None,
424 version,
425 )
426 } else {
427 info!("Permanent timeout");
428 let result = SupportedFunctionReturnValue::ExecutionError(
429 FinishedExecutionError::PermanentTimeout,
430 );
431 let child_finished =
432 parent.map(|(parent_execution_id, parent_join_set)| {
433 ChildFinishedResponse {
434 parent_execution_id,
435 parent_join_set,
436 result: result.clone(),
437 }
438 });
439 (
440 ExecutionEventInner::Finished {
441 result,
442 http_client_traces,
443 },
444 child_finished,
445 version,
446 )
447 }
448 }
449 WorkerError::DbError(db_error) => {
450 return Err(db_error);
451 }
452 WorkerError::ActivityTrap {
453 reason: reason_inner,
454 trap_kind,
455 detail,
456 version,
457 http_client_traces,
458 } => retry_or_fail(
459 result_obtained_at,
460 parent,
461 can_be_retried,
462 reason_full,
463 reason_inner,
464 trap_kind, detail,
466 version,
467 http_client_traces,
468 ),
469 WorkerError::ActivityPreopenedDirError {
470 reason_kind,
471 reason_inner,
472 version,
473 } => retry_or_fail(
474 result_obtained_at,
475 parent,
476 can_be_retried,
477 reason_full,
478 reason_inner,
479 reason_kind, None, version,
482 None, ),
484 WorkerError::ActivityReturnedError {
485 detail,
486 version,
487 http_client_traces,
488 } => {
489 let duration = can_be_retried.expect(
490 "ActivityReturnedError must not be returned when retries are exhausted",
491 );
492 let expires_at = result_obtained_at + duration;
493 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
494 (
495 ExecutionEventInner::TemporarilyFailed {
496 backoff_expires_at: expires_at,
497 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
499 detail, http_client_traces,
501 },
502 None,
503 version,
504 )
505 }
506 WorkerError::LimitReached {
507 reason: inner_reason,
508 version: new_version,
509 } => {
510 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
511 warn!(
512 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
513 );
514 (
515 ExecutionEventInner::Unlocked {
516 backoff_expires_at: expires_at,
517 reason: StrVariant::from(reason_full),
518 },
519 None,
520 new_version,
521 )
522 }
523 WorkerError::FatalError(fatal_error, version) => {
524 warn!("Fatal worker error - {fatal_error:?}");
525 let result = SupportedFunctionReturnValue::ExecutionError(
526 FinishedExecutionError::from(fatal_error),
527 );
528 let child_finished =
529 parent.map(|(parent_execution_id, parent_join_set)| {
530 ChildFinishedResponse {
531 parent_execution_id,
532 parent_join_set,
533 result: result.clone(),
534 }
535 });
536 (
537 ExecutionEventInner::Finished {
538 result,
539 http_client_traces: None,
540 },
541 child_finished,
542 version,
543 )
544 }
545 };
546 Some(Append {
547 created_at: result_obtained_at,
548 primary_event: AppendRequest {
549 created_at: result_obtained_at,
550 event: primary_event,
551 },
552 execution_id,
553 version,
554 child_finished,
555 })
556 }
557 })
558 }
559}
560
561#[expect(clippy::too_many_arguments)]
562fn retry_or_fail(
563 result_obtained_at: DateTime<Utc>,
564 parent: Option<(ExecutionId, JoinSetId)>,
565 can_be_retried: Option<Duration>,
566 reason_full: String,
567 reason_inner: String,
568 err_kind_for_logs: impl Display,
569 detail: Option<String>,
570 version: Version,
571 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
572) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
573 if let Some(duration) = can_be_retried {
574 let expires_at = result_obtained_at + duration;
575 debug!(
576 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
577 );
578 (
579 ExecutionEventInner::TemporarilyFailed {
580 backoff_expires_at: expires_at,
581 reason_full: StrVariant::from(reason_full),
582 reason_inner: StrVariant::from(reason_inner),
583 detail,
584 http_client_traces,
585 },
586 None,
587 version,
588 )
589 } else {
590 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
591 let result = SupportedFunctionReturnValue::ExecutionError(
592 FinishedExecutionError::PermanentFailure {
593 reason_inner,
594 reason_full,
595 kind: PermanentFailureKind::ActivityTrap,
596 detail,
597 },
598 );
599
600 let child_finished =
601 parent.map(
602 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
603 parent_execution_id,
604 parent_join_set,
605 result: result.clone(),
606 },
607 );
608 (
609 ExecutionEventInner::Finished {
610 result,
611 http_client_traces,
612 },
613 child_finished,
614 version,
615 )
616 }
617}
618
619#[derive(Debug, Clone)]
620pub(crate) struct ChildFinishedResponse {
621 pub(crate) parent_execution_id: ExecutionId,
622 pub(crate) parent_join_set: JoinSetId,
623 pub(crate) result: SupportedFunctionReturnValue,
624}
625
626#[derive(Debug, Clone)]
627pub(crate) struct Append {
628 pub(crate) created_at: DateTime<Utc>,
629 pub(crate) primary_event: AppendRequest,
630 pub(crate) execution_id: ExecutionId,
631 pub(crate) version: Version,
632 pub(crate) child_finished: Option<ChildFinishedResponse>,
633}
634
635impl Append {
636 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
637 if let Some(child_finished) = self.child_finished {
638 assert_matches!(
639 &self.primary_event,
640 AppendRequest {
641 event: ExecutionEventInner::Finished { .. },
642 ..
643 }
644 );
645 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
646 let events = AppendEventsToExecution {
647 execution_id: self.execution_id,
648 version: self.version.clone(),
649 batch: vec![self.primary_event],
650 };
651 let response = AppendResponseToExecution {
652 parent_execution_id: child_finished.parent_execution_id,
653 parent_response_event: JoinSetResponseEventOuter {
654 created_at: self.created_at,
655 event: JoinSetResponseEvent {
656 join_set_id: child_finished.parent_join_set,
657 event: JoinSetResponse::ChildExecutionFinished {
658 child_execution_id: derived,
659 finished_version: self.version,
661 result: child_finished.result,
662 },
663 },
664 },
665 };
666
667 db_exec
668 .append_batch_respond_to_parent(events, response, self.created_at)
669 .await?;
670 } else {
671 db_exec
672 .append(self.execution_id, self.version, self.primary_event)
673 .await?;
674 }
675 Ok(())
676 }
677}
678
679#[cfg(any(test, feature = "test"))]
680pub mod simple_worker {
681 use crate::worker::{Worker, WorkerContext, WorkerResult};
682 use async_trait::async_trait;
683 use concepts::{
684 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
685 storage::{HistoryEvent, Version},
686 };
687 use indexmap::IndexMap;
688 use std::sync::Arc;
689 use tracing::trace;
690
691 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
692 pub type SimpleWorkerResultMap =
693 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
694
695 #[derive(Clone, Debug)]
696 pub struct SimpleWorker {
697 pub worker_results_rev: SimpleWorkerResultMap,
698 pub ffqn: FunctionFqn,
699 exported: [FunctionMetadata; 1],
700 }
701
702 impl SimpleWorker {
703 #[must_use]
704 pub fn with_single_result(res: WorkerResult) -> Self {
705 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
706 Version::new(2),
707 (vec![], res),
708 )]))))
709 }
710
711 #[must_use]
712 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
713 Self {
714 worker_results_rev: self.worker_results_rev,
715 exported: [FunctionMetadata {
716 ffqn: ffqn.clone(),
717 parameter_types: ParameterTypes::default(),
718 return_type: RETURN_TYPE_DUMMY,
719 extension: None,
720 submittable: true,
721 }],
722 ffqn,
723 }
724 }
725
726 #[must_use]
727 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
728 Self {
729 worker_results_rev,
730 ffqn: FFQN_SOME,
731 exported: [FunctionMetadata {
732 ffqn: FFQN_SOME,
733 parameter_types: ParameterTypes::default(),
734 return_type: RETURN_TYPE_DUMMY,
735 extension: None,
736 submittable: true,
737 }],
738 }
739 }
740 }
741
742 #[async_trait]
743 impl Worker for SimpleWorker {
744 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
745 let (expected_version, (expected_eh, worker_result)) =
746 self.worker_results_rev.lock().unwrap().pop().unwrap();
747 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
748 assert_eq!(expected_version, ctx.version);
749 assert_eq!(expected_eh, ctx.event_history);
750 worker_result
751 }
752
753 fn exported_functions(&self) -> &[FunctionMetadata] {
754 &self.exported
755 }
756 }
757}
758
759#[cfg(test)]
760mod tests {
761 use self::simple_worker::SimpleWorker;
762 use super::*;
763 use crate::{expired_timers_watcher, worker::WorkerResult};
764 use assert_matches::assert_matches;
765 use async_trait::async_trait;
766 use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
767 use concepts::storage::{DbPoolCloseable, LockedBy};
768 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
769 use concepts::time::Now;
770 use concepts::{
771 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
772 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
773 };
774 use db_tests::Database;
775 use indexmap::IndexMap;
776 use simple_worker::FFQN_SOME;
777 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
778 use test_utils::set_up;
779 use test_utils::sim_clock::{ConstClock, SimClock};
780
781 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
782
783 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
784 config: ExecConfig,
785 clock_fn: C,
786 db_exec: Arc<dyn DbExecutor>,
787 worker: Arc<W>,
788 executed_at: DateTime<Utc>,
789 ) -> Vec<ExecutionId> {
790 trace!("Ticking with {worker:?}");
791 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
792 let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
793 executor
794 .tick_test_await(executed_at, RunId::generate())
795 .await
796 }
797
798 #[tokio::test]
799 async fn execute_simple_lifecycle_tick_based_mem() {
800 let created_at = Now.now();
801 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
802 execute_simple_lifecycle_tick_based(
803 db_pool.connection().as_ref(),
804 db_exec.clone(),
805 ConstClock(created_at),
806 )
807 .await;
808 db_close.close().await;
809 }
810
811 #[tokio::test]
812 async fn execute_simple_lifecycle_tick_based_sqlite() {
813 let created_at = Now.now();
814 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
815 execute_simple_lifecycle_tick_based(
816 db_pool.connection().as_ref(),
817 db_exec.clone(),
818 ConstClock(created_at),
819 )
820 .await;
821 db_close.close().await;
822 }
823
824 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
825 db_connection: &dyn DbConnection,
826 db_exec: Arc<dyn DbExecutor>,
827 clock_fn: C,
828 ) {
829 set_up();
830 let created_at = clock_fn.now();
831 let exec_config = ExecConfig {
832 batch_size: 1,
833 lock_expiry: Duration::from_secs(1),
834 tick_sleep: Duration::from_millis(100),
835 component_id: ComponentId::dummy_activity(),
836 task_limiter: None,
837 executor_id: ExecutorId::generate(),
838 retry_config: ComponentRetryConfig::ZERO,
839 };
840
841 let execution_log = create_and_tick(
842 CreateAndTickConfig {
843 execution_id: ExecutionId::generate(),
844 created_at,
845 executed_at: created_at,
846 },
847 clock_fn,
848 db_connection,
849 db_exec,
850 exec_config,
851 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
852 SUPPORTED_RETURN_VALUE_OK_EMPTY,
853 Version::new(2),
854 None,
855 ))),
856 tick_fn,
857 )
858 .await;
859 assert_matches!(
860 execution_log.events.get(2).unwrap(),
861 ExecutionEvent {
862 event: ExecutionEventInner::Finished {
863 result: SupportedFunctionReturnValue::Ok { ok: None },
864 http_client_traces: None
865 },
866 created_at: _,
867 backtrace_id: None,
868 }
869 );
870 }
871
872 #[tokio::test]
873 async fn execute_simple_lifecycle_task_based_mem() {
874 set_up();
875 let created_at = Now.now();
876 let clock_fn = ConstClock(created_at);
877 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
878 let exec_config = ExecConfig {
879 batch_size: 1,
880 lock_expiry: Duration::from_secs(1),
881 tick_sleep: Duration::ZERO,
882 component_id: ComponentId::dummy_activity(),
883 task_limiter: None,
884 executor_id: ExecutorId::generate(),
885 retry_config: ComponentRetryConfig::ZERO,
886 };
887
888 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
889 SUPPORTED_RETURN_VALUE_OK_EMPTY,
890 Version::new(2),
891 None,
892 )));
893
894 let execution_log = create_and_tick(
895 CreateAndTickConfig {
896 execution_id: ExecutionId::generate(),
897 created_at,
898 executed_at: created_at,
899 },
900 clock_fn,
901 db_pool.connection().as_ref(),
902 db_exec,
903 exec_config,
904 worker,
905 tick_fn,
906 )
907 .await;
908 assert_matches!(
909 execution_log.events.get(2).unwrap(),
910 ExecutionEvent {
911 event: ExecutionEventInner::Finished {
912 result: SupportedFunctionReturnValue::Ok { ok: None },
913 http_client_traces: None
914 },
915 created_at: _,
916 backtrace_id: None,
917 }
918 );
919 db_close.close().await;
920 }
921
922 struct CreateAndTickConfig {
923 execution_id: ExecutionId,
924 created_at: DateTime<Utc>,
925 executed_at: DateTime<Utc>,
926 }
927
928 async fn create_and_tick<
929 W: Worker,
930 C: ClockFn,
931 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
932 F: Future<Output = Vec<ExecutionId>>,
933 >(
934 config: CreateAndTickConfig,
935 clock_fn: C,
936 db_connection: &dyn DbConnection,
937 db_exec: Arc<dyn DbExecutor>,
938 exec_config: ExecConfig,
939 worker: Arc<W>,
940 mut tick: T,
941 ) -> ExecutionLog {
942 db_connection
944 .create(CreateRequest {
945 created_at: config.created_at,
946 execution_id: config.execution_id.clone(),
947 ffqn: FFQN_SOME,
948 params: Params::empty(),
949 parent: None,
950 metadata: concepts::ExecutionMetadata::empty(),
951 scheduled_at: config.created_at,
952 component_id: ComponentId::dummy_activity(),
953 scheduled_by: None,
954 })
955 .await
956 .unwrap();
957 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
959 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
960 debug!("Execution history after tick: {execution_log:?}");
961 assert_matches!(
963 execution_log.events.first().unwrap(),
964 ExecutionEvent {
965 event: ExecutionEventInner::Created { .. },
966 created_at: actually_created_at,
967 backtrace_id: None,
968 }
969 if config.created_at == *actually_created_at
970 );
971 let locked_at = assert_matches!(
972 execution_log.events.get(1).unwrap(),
973 ExecutionEvent {
974 event: ExecutionEventInner::Locked { .. },
975 created_at: locked_at,
976 backtrace_id: None,
977 } if config.created_at <= *locked_at
978 => *locked_at
979 );
980 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
981 event: _,
982 created_at: executed_at,
983 backtrace_id: None,
984 } if *executed_at >= locked_at);
985 execution_log
986 }
987
988 #[tokio::test]
989 async fn activity_trap_should_trigger_an_execution_retry() {
990 set_up();
991 let sim_clock = SimClock::default();
992 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
993 let retry_exp_backoff = Duration::from_millis(100);
994 let retry_config = ComponentRetryConfig {
995 max_retries: 1,
996 retry_exp_backoff,
997 };
998 let exec_config = ExecConfig {
999 batch_size: 1,
1000 lock_expiry: Duration::from_secs(1),
1001 tick_sleep: Duration::ZERO,
1002 component_id: ComponentId::dummy_activity(),
1003 task_limiter: None,
1004 executor_id: ExecutorId::generate(),
1005 retry_config,
1006 };
1007 let expected_reason = "error reason";
1008 let expected_detail = "error detail";
1009 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1010 WorkerError::ActivityTrap {
1011 reason: expected_reason.to_string(),
1012 trap_kind: concepts::TrapKind::Trap,
1013 detail: Some(expected_detail.to_string()),
1014 version: Version::new(2),
1015 http_client_traces: None,
1016 },
1017 )));
1018 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1019 let execution_log = create_and_tick(
1020 CreateAndTickConfig {
1021 execution_id: ExecutionId::generate(),
1022 created_at: sim_clock.now(),
1023 executed_at: sim_clock.now(),
1024 },
1025 sim_clock.clone(),
1026 db_pool.connection().as_ref(),
1027 db_exec.clone(),
1028 exec_config.clone(),
1029 worker,
1030 tick_fn,
1031 )
1032 .await;
1033 assert_eq!(3, execution_log.events.len());
1034 {
1035 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1036 &execution_log.events.get(2).unwrap(),
1037 ExecutionEvent {
1038 event: ExecutionEventInner::TemporarilyFailed {
1039 reason_inner,
1040 reason_full,
1041 detail,
1042 backoff_expires_at,
1043 http_client_traces: None,
1044 },
1045 created_at: at,
1046 backtrace_id: None,
1047 }
1048 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1049 );
1050 assert_eq!(expected_reason, reason_inner.deref());
1051 assert_eq!(
1052 format!("activity trap: {expected_reason}"),
1053 reason_full.deref()
1054 );
1055 assert_eq!(Some(expected_detail), detail.as_deref());
1056 assert_eq!(at, sim_clock.now());
1057 assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1058 }
1059 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1060 std::sync::Mutex::new(IndexMap::from([(
1061 Version::new(4),
1062 (
1063 vec![],
1064 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1065 ),
1066 )])),
1067 )));
1068 assert!(
1070 tick_fn(
1071 exec_config.clone(),
1072 sim_clock.clone(),
1073 db_exec.clone(),
1074 worker.clone(),
1075 sim_clock.now(),
1076 )
1077 .await
1078 .is_empty()
1079 );
1080 sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1082 tick_fn(
1083 exec_config,
1084 sim_clock.clone(),
1085 db_exec.clone(),
1086 worker,
1087 sim_clock.now(),
1088 )
1089 .await;
1090 let execution_log = {
1091 let db_connection = db_pool.connection();
1092 db_connection
1093 .get(&execution_log.execution_id)
1094 .await
1095 .unwrap()
1096 };
1097 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1098 assert_matches!(
1099 execution_log.events.get(3).unwrap(),
1100 ExecutionEvent {
1101 event: ExecutionEventInner::Locked { .. },
1102 created_at: at,
1103 backtrace_id: None,
1104 } if *at == sim_clock.now()
1105 );
1106 assert_matches!(
1107 execution_log.events.get(4).unwrap(),
1108 ExecutionEvent {
1109 event: ExecutionEventInner::Finished {
1110 result: SupportedFunctionReturnValue::Ok{ok:None},
1111 http_client_traces: None
1112 },
1113 created_at: finished_at,
1114 backtrace_id: None,
1115 } if *finished_at == sim_clock.now()
1116 );
1117 db_close.close().await;
1118 }
1119
1120 #[tokio::test]
1121 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1122 set_up();
1123 let created_at = Now.now();
1124 let clock_fn = ConstClock(created_at);
1125 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1126 let exec_config = ExecConfig {
1127 batch_size: 1,
1128 lock_expiry: Duration::from_secs(1),
1129 tick_sleep: Duration::ZERO,
1130 component_id: ComponentId::dummy_activity(),
1131 task_limiter: None,
1132 executor_id: ExecutorId::generate(),
1133 retry_config: ComponentRetryConfig::ZERO,
1134 };
1135
1136 let expected_reason = "error reason";
1137 let expected_detail = "error detail";
1138 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1139 WorkerError::ActivityTrap {
1140 reason: expected_reason.to_string(),
1141 trap_kind: concepts::TrapKind::Trap,
1142 detail: Some(expected_detail.to_string()),
1143 version: Version::new(2),
1144 http_client_traces: None,
1145 },
1146 )));
1147 let execution_log = create_and_tick(
1148 CreateAndTickConfig {
1149 execution_id: ExecutionId::generate(),
1150 created_at,
1151 executed_at: created_at,
1152 },
1153 clock_fn,
1154 db_pool.connection().as_ref(),
1155 db_exec,
1156 exec_config.clone(),
1157 worker,
1158 tick_fn,
1159 )
1160 .await;
1161 assert_eq!(3, execution_log.events.len());
1162 let (reason, kind, detail) = assert_matches!(
1163 &execution_log.events.get(2).unwrap(),
1164 ExecutionEvent {
1165 event: ExecutionEventInner::Finished{
1166 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1167 http_client_traces: None
1168 },
1169 created_at: at,
1170 backtrace_id: None,
1171 } if *at == created_at
1172 => (reason_inner, kind, detail)
1173 );
1174 assert_eq!(expected_reason, *reason);
1175 assert_eq!(Some(expected_detail), detail.as_deref());
1176 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1177
1178 db_close.close().await;
1179 }
1180
1181 #[tokio::test]
1182 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1183 let worker_error = WorkerError::ActivityTrap {
1184 reason: "error reason".to_string(),
1185 trap_kind: TrapKind::Trap,
1186 detail: Some("detail".to_string()),
1187 version: Version::new(2),
1188 http_client_traces: None,
1189 };
1190 let expected_child_err = FinishedExecutionError::PermanentFailure {
1191 reason_full: "activity trap: error reason".to_string(),
1192 reason_inner: "error reason".to_string(),
1193 kind: PermanentFailureKind::ActivityTrap,
1194 detail: Some("detail".to_string()),
1195 };
1196 child_execution_permanently_failed_should_notify_parent(
1197 WorkerResult::Err(worker_error),
1198 expected_child_err,
1199 )
1200 .await;
1201 }
1202
1203 #[tokio::test]
1206 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1207 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1208 child_execution_permanently_failed_should_notify_parent(
1209 WorkerResult::DbUpdatedByWorkerOrWatcher,
1210 expected_child_err,
1211 )
1212 .await;
1213 }
1214
1215 async fn child_execution_permanently_failed_should_notify_parent(
1216 worker_result: WorkerResult,
1217 expected_child_err: FinishedExecutionError,
1218 ) {
1219 use concepts::storage::JoinSetResponseEventOuter;
1220 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1221
1222 set_up();
1223 let sim_clock = SimClock::default();
1224 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1225
1226 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1227 WorkerResult::DbUpdatedByWorkerOrWatcher,
1228 ));
1229 let parent_execution_id = ExecutionId::generate();
1230 db_pool
1231 .connection()
1232 .create(CreateRequest {
1233 created_at: sim_clock.now(),
1234 execution_id: parent_execution_id.clone(),
1235 ffqn: FFQN_SOME,
1236 params: Params::empty(),
1237 parent: None,
1238 metadata: concepts::ExecutionMetadata::empty(),
1239 scheduled_at: sim_clock.now(),
1240 component_id: ComponentId::dummy_activity(),
1241 scheduled_by: None,
1242 })
1243 .await
1244 .unwrap();
1245 let parent_executor_id = ExecutorId::generate();
1246 tick_fn(
1247 ExecConfig {
1248 batch_size: 1,
1249 lock_expiry: LOCK_EXPIRY,
1250 tick_sleep: Duration::ZERO,
1251 component_id: ComponentId::dummy_activity(),
1252 task_limiter: None,
1253 executor_id: parent_executor_id,
1254 retry_config: ComponentRetryConfig::ZERO,
1255 },
1256 sim_clock.clone(),
1257 db_exec.clone(),
1258 parent_worker,
1259 sim_clock.now(),
1260 )
1261 .await;
1262
1263 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1264 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1265 {
1267 let params = Params::empty();
1268 let child = CreateRequest {
1269 created_at: sim_clock.now(),
1270 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1271 ffqn: FFQN_CHILD,
1272 params: params.clone(),
1273 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1274 metadata: concepts::ExecutionMetadata::empty(),
1275 scheduled_at: sim_clock.now(),
1276 component_id: ComponentId::dummy_activity(),
1277 scheduled_by: None,
1278 };
1279 let current_time = sim_clock.now();
1280 let join_set = AppendRequest {
1281 created_at: current_time,
1282 event: ExecutionEventInner::HistoryEvent {
1283 event: HistoryEvent::JoinSetCreate {
1284 join_set_id: join_set_id.clone(),
1285 closing_strategy: ClosingStrategy::Complete,
1286 },
1287 },
1288 };
1289 let child_exec_req = AppendRequest {
1290 created_at: current_time,
1291 event: ExecutionEventInner::HistoryEvent {
1292 event: HistoryEvent::JoinSetRequest {
1293 join_set_id: join_set_id.clone(),
1294 request: JoinSetRequest::ChildExecutionRequest {
1295 child_execution_id: child_execution_id.clone(),
1296 target_ffqn: FFQN_CHILD,
1297 params,
1298 },
1299 },
1300 },
1301 };
1302 let join_next = AppendRequest {
1303 created_at: current_time,
1304 event: ExecutionEventInner::HistoryEvent {
1305 event: HistoryEvent::JoinNext {
1306 join_set_id: join_set_id.clone(),
1307 run_expires_at: sim_clock.now(),
1308 closing: false,
1309 requested_ffqn: Some(FFQN_CHILD),
1310 },
1311 },
1312 };
1313 db_pool
1314 .connection()
1315 .append_batch_create_new_execution(
1316 current_time,
1317 vec![join_set, child_exec_req, join_next],
1318 parent_execution_id.clone(),
1319 Version::new(2),
1320 vec![child],
1321 )
1322 .await
1323 .unwrap();
1324 }
1325
1326 let child_worker =
1327 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1328
1329 tick_fn(
1331 ExecConfig {
1332 batch_size: 1,
1333 lock_expiry: LOCK_EXPIRY,
1334 tick_sleep: Duration::ZERO,
1335 component_id: ComponentId::dummy_activity(),
1336 task_limiter: None,
1337 executor_id: ExecutorId::generate(),
1338 retry_config: ComponentRetryConfig::ZERO,
1339 },
1340 sim_clock.clone(),
1341 db_exec.clone(),
1342 child_worker,
1343 sim_clock.now(),
1344 )
1345 .await;
1346 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1347 sim_clock.move_time_forward(LOCK_EXPIRY);
1349 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1350 .await
1351 .unwrap();
1352 }
1353 let child_log = db_pool
1354 .connection()
1355 .get(&ExecutionId::Derived(child_execution_id.clone()))
1356 .await
1357 .unwrap();
1358 assert!(child_log.pending_state.is_finished());
1359 assert_eq!(
1360 Version(2),
1361 child_log.next_version,
1362 "created = 0, locked = 1, with_single_result = 2"
1363 );
1364 assert_eq!(
1365 ExecutionEventInner::Finished {
1366 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1367 http_client_traces: None
1368 },
1369 child_log.last_event().event
1370 );
1371 let parent_log = db_pool
1372 .connection()
1373 .get(&parent_execution_id)
1374 .await
1375 .unwrap();
1376 assert_matches!(
1377 parent_log.pending_state,
1378 PendingState::PendingAt {
1379 scheduled_at,
1380 last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1381 } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1382 "parent should be back to pending"
1383 );
1384 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1385 parent_log.responses.last(),
1386 Some(JoinSetResponseEventOuter{
1387 created_at: at,
1388 event: JoinSetResponseEvent{
1389 join_set_id: found_join_set_id,
1390 event: JoinSetResponse::ChildExecutionFinished {
1391 child_execution_id: found_child_execution_id,
1392 finished_version,
1393 result: found_result,
1394 }
1395 }
1396 })
1397 if *at == sim_clock.now()
1398 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1399 );
1400 assert_eq!(join_set_id, *found_join_set_id);
1401 assert_eq!(child_execution_id, *found_child_execution_id);
1402 assert_eq!(child_log.next_version, *child_finished_version);
1403 assert_matches!(
1404 found_result,
1405 SupportedFunctionReturnValue::ExecutionError(_)
1406 );
1407
1408 db_close.close().await;
1409 }
1410
1411 #[derive(Clone, Debug)]
1412 struct SleepyWorker {
1413 duration: Duration,
1414 result: SupportedFunctionReturnValue,
1415 exported: [FunctionMetadata; 1],
1416 }
1417
1418 #[async_trait]
1419 impl Worker for SleepyWorker {
1420 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1421 tokio::time::sleep(self.duration).await;
1422 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1423 }
1424
1425 fn exported_functions(&self) -> &[FunctionMetadata] {
1426 &self.exported
1427 }
1428 }
1429
1430 #[tokio::test]
1431 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1432 set_up();
1433 let sim_clock = SimClock::default();
1434 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1435 let lock_expiry = Duration::from_millis(100);
1436 let timeout_duration = Duration::from_millis(300);
1437 let retry_config = ComponentRetryConfig {
1438 max_retries: 1,
1439 retry_exp_backoff: timeout_duration,
1440 };
1441 let exec_config = ExecConfig {
1442 batch_size: 1,
1443 lock_expiry,
1444 tick_sleep: Duration::ZERO,
1445 component_id: ComponentId::dummy_activity(),
1446 task_limiter: None,
1447 executor_id: ExecutorId::generate(),
1448 retry_config,
1449 };
1450
1451 let worker = Arc::new(SleepyWorker {
1452 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1454 exported: [FunctionMetadata {
1455 ffqn: FFQN_SOME,
1456 parameter_types: ParameterTypes::default(),
1457 return_type: RETURN_TYPE_DUMMY,
1458 extension: None,
1459 submittable: true,
1460 }],
1461 });
1462 let execution_id = ExecutionId::generate();
1464 let db_connection = db_pool.connection();
1465 db_connection
1466 .create(CreateRequest {
1467 created_at: sim_clock.now(),
1468 execution_id: execution_id.clone(),
1469 ffqn: FFQN_SOME,
1470 params: Params::empty(),
1471 parent: None,
1472 metadata: concepts::ExecutionMetadata::empty(),
1473 scheduled_at: sim_clock.now(),
1474 component_id: ComponentId::dummy_activity(),
1475 scheduled_by: None,
1476 })
1477 .await
1478 .unwrap();
1479
1480 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1481 let executor = ExecTask::new_test(
1482 worker,
1483 exec_config.clone(),
1484 sim_clock.clone(),
1485 db_exec.clone(),
1486 ffqns,
1487 );
1488 let mut first_execution_progress = executor
1489 .tick(sim_clock.now(), RunId::generate())
1490 .await
1491 .unwrap();
1492 assert_eq!(1, first_execution_progress.executions.len());
1493 sim_clock.move_time_forward(lock_expiry);
1495 let now_after_first_lock_expiry = sim_clock.now();
1497 {
1498 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1499 let cleanup_progress = executor
1500 .tick(now_after_first_lock_expiry, RunId::generate())
1501 .await
1502 .unwrap();
1503 assert!(cleanup_progress.executions.is_empty());
1504 }
1505 {
1506 let expired_locks = expired_timers_watcher::tick(
1507 db_pool.connection().as_ref(),
1508 now_after_first_lock_expiry,
1509 )
1510 .await
1511 .unwrap()
1512 .expired_locks;
1513 assert_eq!(1, expired_locks);
1514 }
1515 assert!(
1516 !first_execution_progress
1517 .executions
1518 .pop()
1519 .unwrap()
1520 .1
1521 .is_finished()
1522 );
1523
1524 let execution_log = db_connection.get(&execution_id).await.unwrap();
1525 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1526 assert_matches!(
1527 &execution_log.events.get(2).unwrap(),
1528 ExecutionEvent {
1529 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1530 created_at: at,
1531 backtrace_id: None,
1532 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1533 );
1534 assert_matches!(
1535 execution_log.pending_state,
1536 PendingState::PendingAt {
1537 scheduled_at: found_scheduled_by,
1538 last_lock: Some(LockedBy {
1539 executor_id: found_executor_id,
1540 run_id: _,
1541 }),
1542 } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1543 );
1544 sim_clock.move_time_forward(timeout_duration);
1545 let now_after_first_timeout = sim_clock.now();
1546 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1547
1548 let mut second_execution_progress = executor
1549 .tick(now_after_first_timeout, RunId::generate())
1550 .await
1551 .unwrap();
1552 assert_eq!(1, second_execution_progress.executions.len());
1553
1554 sim_clock.move_time_forward(lock_expiry);
1556 let now_after_second_lock_expiry = sim_clock.now();
1558 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1559 {
1560 let cleanup_progress = executor
1561 .tick(now_after_second_lock_expiry, RunId::generate())
1562 .await
1563 .unwrap();
1564 assert!(cleanup_progress.executions.is_empty());
1565 }
1566 {
1567 let expired_locks = expired_timers_watcher::tick(
1568 db_pool.connection().as_ref(),
1569 now_after_second_lock_expiry,
1570 )
1571 .await
1572 .unwrap()
1573 .expired_locks;
1574 assert_eq!(1, expired_locks);
1575 }
1576 assert!(
1577 !second_execution_progress
1578 .executions
1579 .pop()
1580 .unwrap()
1581 .1
1582 .is_finished()
1583 );
1584
1585 drop(db_connection);
1586 drop(executor);
1587 db_close.close().await;
1588 }
1589}