1use crate::worker::{FatalError, Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11 ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16 FinishedExecutionError,
17 storage::{ExecutionRequest, Version},
18};
19use std::{
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31 pub lock_expiry: Duration,
32 pub tick_sleep: Duration,
33 pub batch_size: u32,
34 pub component_id: ComponentId,
35 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36 pub executor_id: ExecutorId,
37 pub retry_config: ComponentRetryConfig,
38 pub locking_strategy: LockingStrategy,
39}
40
41pub struct ExecTask<C: ClockFn> {
42 worker: Arc<dyn Worker>,
43 config: ExecConfig,
44 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
46 locking_strategy_holder: LockingStrategyHolder,
47}
48
49#[derive(derive_more::Debug, Default)]
50pub struct ExecutionProgress {
51 #[debug(skip)]
52 #[allow(dead_code)]
53 executions: Vec<(ExecutionId, JoinHandle<()>)>,
54}
55
56impl ExecutionProgress {
57 #[cfg(feature = "test")]
58 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
59 let mut vec = Vec::new();
60 for (exe, join_handle) in self.executions {
61 vec.push(exe);
62 join_handle.await.unwrap();
63 }
64 vec
65 }
66}
67
68#[derive(derive_more::Debug)]
69pub struct ExecutorTaskHandle {
70 #[debug(skip)]
71 is_closing: Arc<AtomicBool>,
72 #[debug(skip)]
73 abort_handle: AbortHandle,
74 component_id: ComponentId,
75 executor_id: ExecutorId,
76}
77
78impl ExecutorTaskHandle {
79 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
80 pub async fn close(&self) {
81 trace!("Gracefully closing");
82 self.is_closing.store(true, Ordering::Relaxed);
90 while !self.abort_handle.is_finished() {
91 tokio::time::sleep(Duration::from_millis(1)).await;
92 }
93 debug!("Gracefully closed");
94 }
95}
96
97impl Drop for ExecutorTaskHandle {
98 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
99 fn drop(&mut self) {
100 if self.abort_handle.is_finished() {
101 return;
102 }
103 warn!("Aborting the executor task");
104 self.abort_handle.abort();
105 }
106}
107
108#[cfg(feature = "test")]
109pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
110 extract_exported_ffqns_noext(worker)
111}
112
113fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
114 worker
115 .exported_functions()
116 .iter()
117 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
118 .collect::<Arc<_>>()
119}
120
121#[derive(Debug, Default, Clone, Copy)]
122pub enum LockingStrategy {
123 #[default]
124 ByFfqns,
125 ByComponentId,
126}
127impl LockingStrategy {
128 fn holder(&self, ffqns: Arc<[FunctionFqn]>) -> LockingStrategyHolder {
129 match self {
130 LockingStrategy::ByFfqns => LockingStrategyHolder::ByFfqns(ffqns),
131 LockingStrategy::ByComponentId => LockingStrategyHolder::ByComponentId,
132 }
133 }
134}
135
136enum LockingStrategyHolder {
137 ByFfqns(Arc<[FunctionFqn]>),
138 ByComponentId,
139}
140
141impl<C: ClockFn + 'static> ExecTask<C> {
142 #[cfg(feature = "test")]
143 pub fn new_test(
144 worker: Arc<dyn Worker>,
145 config: ExecConfig,
146 clock_fn: C,
147 db_exec: Arc<dyn DbExecutor>,
148 ffqns: Arc<[FunctionFqn]>,
149 ) -> Self {
150 Self {
151 worker,
152 locking_strategy_holder: config.locking_strategy.holder(ffqns),
153 config,
154 clock_fn,
155 db_exec,
156 }
157 }
158
159 #[cfg(feature = "test")]
160 pub fn new_all_ffqns_test(
161 worker: Arc<dyn Worker>,
162 config: ExecConfig,
163 clock_fn: C,
164 db_exec: Arc<dyn DbExecutor>,
165 ) -> Self {
166 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167 Self {
168 worker,
169 locking_strategy_holder: config.locking_strategy.holder(ffqns),
170 config,
171 clock_fn,
172 db_exec,
173 }
174 }
175
176 pub fn spawn_new(
177 worker: Arc<dyn Worker>,
178 config: ExecConfig,
179 clock_fn: C,
180 db_exec: Arc<dyn DbExecutor>,
181 sleep: impl Sleep + 'static,
182 ) -> ExecutorTaskHandle {
183 let is_closing = Arc::new(AtomicBool::default());
184 let is_closing_inner = is_closing.clone();
185 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
186 let component_id = config.component_id.clone();
187 let executor_id = config.executor_id;
188 let abort_handle = tokio::spawn(async move {
189 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
190 let lock_strategy_holder = config.locking_strategy.holder(ffqns);
191 let task = ExecTask {
192 worker,
193 config,
194 db_exec,
195 locking_strategy_holder: lock_strategy_holder,
196 clock_fn: clock_fn.clone(),
197 };
198 while !is_closing_inner.load(Ordering::Relaxed) {
199 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
200
201 task.db_exec
202 .wait_for_pending_by_component_id(clock_fn.now(), &task.config.component_id, {
203 let sleep = sleep.clone();
204 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
205 .await;
206 }
207 })
208 .abort_handle();
209 ExecutorTaskHandle {
210 is_closing,
211 abort_handle,
212 component_id,
213 executor_id,
214 }
215 }
216
217 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
218 if let Some(task_limiter) = &self.config.task_limiter {
219 let mut locks = Vec::new();
220 for _ in 0..self.config.batch_size {
221 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
222 locks.push(Some(permit));
223 } else {
224 break;
225 }
226 }
227 locks
228 } else {
229 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
230 for _ in 0..self.config.batch_size {
231 vec.push(None);
232 }
233 vec
234 }
235 }
236
237 #[cfg(feature = "test")]
238 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
239 self.tick(executed_at, run_id).await.unwrap()
240 }
241
242 #[cfg(feature = "test")]
243 pub async fn tick_test_await(
244 &self,
245 executed_at: DateTime<Utc>,
246 run_id: RunId,
247 ) -> Vec<ExecutionId> {
248 self.tick(executed_at, run_id)
249 .await
250 .unwrap()
251 .wait_for_tasks()
252 .await
253 }
254
255 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
256 async fn tick(
257 &self,
258 executed_at: DateTime<Utc>,
259 run_id: RunId,
260 ) -> Result<ExecutionProgress, DbErrorGeneric> {
261 let locked_executions = {
262 let mut permits = self.acquire_task_permits();
263 if permits.is_empty() {
264 return Ok(ExecutionProgress::default());
265 }
266 let lock_expires_at = executed_at + self.config.lock_expiry;
267 let locked_executions = match &self.locking_strategy_holder {
268 LockingStrategyHolder::ByFfqns(ffqns) => {
269 self.db_exec
270 .lock_pending_by_ffqns(
271 permits.len(), executed_at, ffqns.clone(),
274 executed_at, self.config.component_id.clone(),
276 self.config.executor_id,
277 lock_expires_at,
278 run_id,
279 self.config.retry_config,
280 )
281 .await?
282 }
283 LockingStrategyHolder::ByComponentId => {
284 self.db_exec
285 .lock_pending_by_component_id(
286 permits.len(), executed_at, &self.config.component_id,
289 executed_at, self.config.executor_id,
291 lock_expires_at,
292 run_id,
293 self.config.retry_config,
294 )
295 .await?
296 }
297 };
298 while permits.len() > locked_executions.len() {
300 permits.pop();
301 }
302 assert_eq!(permits.len(), locked_executions.len());
303 locked_executions.into_iter().zip(permits)
304 };
305
306 let mut executions = Vec::with_capacity(locked_executions.len());
307 for (locked_execution, permit) in locked_executions {
308 let execution_id = locked_execution.execution_id.clone();
309 let join_handle = {
310 let worker = self.worker.clone();
311 let db = self.db_exec.clone();
312 let clock_fn = self.clock_fn.clone();
313 let worker_span = info_span!(parent: None, "worker",
314 "otel.name" = format!("worker {}", locked_execution.ffqn),
315 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
316 locked_execution.metadata.enrich(&worker_span);
317 tokio::spawn({
318 let worker_span2 = worker_span.clone();
319 let retry_config = self.config.retry_config;
320 async move {
321 let _permit = permit;
322 let res = Self::run_worker(
323 worker,
324 db.as_ref(),
325 clock_fn,
326 locked_execution,
327 retry_config,
328 worker_span2,
329 )
330 .await;
331 if let Err(db_error) = res {
332 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
333 }
334 }
335 .instrument(worker_span)
336 })
337 };
338 executions.push((execution_id, join_handle));
339 }
340 Ok(ExecutionProgress { executions })
341 }
342
343 async fn run_worker(
344 worker: Arc<dyn Worker>,
345 db_exec: &dyn DbExecutor,
346 clock_fn: C,
347 locked_execution: LockedExecution,
348 retry_config: ComponentRetryConfig,
349 worker_span: Span,
350 ) -> Result<(), DbErrorWrite> {
351 debug!("Worker::run starting");
352 trace!(
353 version = %locked_execution.next_version,
354 params = ?locked_execution.params,
355 event_history = ?locked_execution.event_history,
356 "Worker::run starting"
357 );
358 let can_be_retried = ExecutionLog::can_be_retried_after(
359 locked_execution.intermittent_event_count + 1,
360 retry_config.max_retries,
361 retry_config.retry_exp_backoff,
362 );
363 let unlock_expiry_on_limit_reached =
364 ExecutionLog::compute_retry_duration_when_retrying_forever(
365 locked_execution.intermittent_event_count + 1,
366 retry_config.retry_exp_backoff,
367 );
368 let ctx = WorkerContext {
369 execution_id: locked_execution.execution_id.clone(),
370 metadata: locked_execution.metadata,
371 ffqn: locked_execution.ffqn,
372 params: locked_execution.params,
373 event_history: locked_execution.event_history,
374 responses: locked_execution
375 .responses
376 .into_iter()
377 .map(|outer| outer.event)
378 .collect(),
379 version: locked_execution.next_version,
380 can_be_retried: can_be_retried.is_some(),
381 locked_event: locked_execution.locked_event,
382 worker_span,
383 };
384 let worker_result = worker.run(ctx).await;
385 trace!(?worker_result, "Worker::run finished");
386 let result_obtained_at = clock_fn.now();
387 match Self::worker_result_to_execution_event(
388 locked_execution.execution_id,
389 worker_result,
390 result_obtained_at,
391 locked_execution.parent,
392 can_be_retried,
393 unlock_expiry_on_limit_reached,
394 )? {
395 Some(append) => {
396 trace!("Appending {append:?}");
397 match append {
398 AppendOrCancel::Cancel {
399 execution_id,
400 cancelled_at,
401 } => db_exec
402 .cancel_activity_with_retries(&execution_id, cancelled_at)
403 .await
404 .map(|_| ()),
405 AppendOrCancel::Other(append) => append.append(db_exec).await,
406 }
407 }
408 None => Ok(()),
409 }
410 }
411
412 fn worker_result_to_execution_event(
414 execution_id: ExecutionId,
415 worker_result: WorkerResult,
416 result_obtained_at: DateTime<Utc>,
417 parent: Option<(ExecutionId, JoinSetId)>,
418 can_be_retried: Option<Duration>,
419 unlock_expiry_on_limit_reached: Duration,
420 ) -> Result<Option<AppendOrCancel>, DbErrorWrite> {
421 Ok(match worker_result {
422 WorkerResult::Ok(result, new_version, http_client_traces) => {
423 info!("Execution finished: {result}");
424 let child_finished =
425 parent.map(
426 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
427 parent_execution_id,
428 parent_join_set,
429 result: result.clone(),
430 },
431 );
432 let primary_event = AppendRequest {
433 created_at: result_obtained_at,
434 event: ExecutionRequest::Finished {
435 result,
436 http_client_traces,
437 },
438 };
439
440 Some(AppendOrCancel::Other(Append {
441 created_at: result_obtained_at,
442 primary_event,
443 execution_id,
444 version: new_version,
445 child_finished,
446 }))
447 }
448 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
449 WorkerResult::Err(err) => {
450 let reason_generic = err.to_string(); let (primary_event, child_finished, version) = match err {
453 WorkerError::TemporaryTimeout {
454 http_client_traces,
455 version,
456 } => {
457 if let Some(duration) = can_be_retried {
458 let backoff_expires_at = result_obtained_at + duration;
459 info!(
460 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
461 );
462 (
463 ExecutionRequest::TemporarilyTimedOut {
464 backoff_expires_at,
465 http_client_traces,
466 },
467 None,
468 version,
469 )
470 } else {
471 info!("Execution timed out");
472 let result = SupportedFunctionReturnValue::ExecutionError(
473 FinishedExecutionError {
474 kind: ExecutionFailureKind::TimedOut,
475 reason: None,
476 detail: None,
477 },
478 );
479 let child_finished =
480 parent.map(|(parent_execution_id, parent_join_set)| {
481 ChildFinishedResponse {
482 parent_execution_id,
483 parent_join_set,
484 result: result.clone(),
485 }
486 });
487 (
488 ExecutionRequest::Finished {
489 result,
490 http_client_traces,
491 },
492 child_finished,
493 version,
494 )
495 }
496 }
497 WorkerError::DbError(db_error) => {
498 return Err(db_error);
499 }
500 WorkerError::ActivityTrap {
501 reason: _, trap_kind,
503 detail,
504 version,
505 http_client_traces,
506 } => {
507 if let Some(duration) = can_be_retried {
508 let expires_at = result_obtained_at + duration;
509 debug!(
510 "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
511 );
512 (
513 ExecutionRequest::TemporarilyFailed {
514 reason: StrVariant::from(reason_generic),
515 backoff_expires_at: expires_at,
516 detail,
517 http_client_traces,
518 },
519 None,
520 version,
521 )
522 } else {
523 info!(
524 "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
525 );
526 let result = SupportedFunctionReturnValue::ExecutionError(
527 FinishedExecutionError {
528 reason: Some(reason_generic),
529 kind: ExecutionFailureKind::Uncategorized,
530 detail,
531 },
532 );
533 let child_finished =
534 parent.map(|(parent_execution_id, parent_join_set)| {
535 ChildFinishedResponse {
536 parent_execution_id,
537 parent_join_set,
538 result: result.clone(),
539 }
540 });
541 (
542 ExecutionRequest::Finished {
543 result,
544 http_client_traces,
545 },
546 child_finished,
547 version,
548 )
549 }
550 }
551 WorkerError::ActivityPreopenedDirError {
552 reason,
553 detail,
554 version,
555 } => {
556 let http_client_traces = None;
557 if let Some(duration) = can_be_retried {
558 let expires_at = result_obtained_at + duration;
559 debug!(
560 "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
561 );
562 (
563 ExecutionRequest::TemporarilyFailed {
564 reason: StrVariant::from(reason_generic),
565 backoff_expires_at: expires_at,
566 detail: Some(detail),
567 http_client_traces,
568 },
569 None,
570 version,
571 )
572 } else {
573 info!(
574 "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
575 );
576 let result = SupportedFunctionReturnValue::ExecutionError(
577 FinishedExecutionError {
578 reason: Some(reason_generic),
579 kind: ExecutionFailureKind::Uncategorized,
580 detail: Some(detail),
581 },
582 );
583 let child_finished =
584 parent.map(|(parent_execution_id, parent_join_set)| {
585 ChildFinishedResponse {
586 parent_execution_id,
587 parent_join_set,
588 result: result.clone(),
589 }
590 });
591 (
592 ExecutionRequest::Finished {
593 result,
594 http_client_traces,
595 },
596 child_finished,
597 version,
598 )
599 }
600 }
601 WorkerError::ActivityReturnedError {
602 detail,
603 version,
604 http_client_traces,
605 } => {
606 let duration = can_be_retried.expect(
607 "ActivityReturnedError must not be returned when retries are exhausted",
608 );
609 let expires_at = result_obtained_at + duration;
610 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
611 (
612 ExecutionRequest::TemporarilyFailed {
613 backoff_expires_at: expires_at,
614 reason: StrVariant::Static("activity returned error"), detail, http_client_traces,
617 },
618 None,
619 version,
620 )
621 }
622 WorkerError::LimitReached {
623 reason,
624 version: new_version,
625 } => {
626 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
627 warn!(
628 "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
629 );
630 (
631 ExecutionRequest::Unlocked {
632 backoff_expires_at: expires_at,
633 reason: StrVariant::from(reason),
634 },
635 None,
636 new_version,
637 )
638 }
639 WorkerError::FatalError(FatalError::Cancelled, _version) => {
640 return Ok(Some(AppendOrCancel::Cancel {
642 execution_id,
643 cancelled_at: result_obtained_at,
644 }));
645 }
646 WorkerError::FatalError(fatal_error, version) => {
647 warn!("Fatal worker error - {fatal_error:?}");
648 let result = SupportedFunctionReturnValue::ExecutionError(
649 FinishedExecutionError::from(fatal_error),
650 );
651 let child_finished =
652 parent.map(|(parent_execution_id, parent_join_set)| {
653 ChildFinishedResponse {
654 parent_execution_id,
655 parent_join_set,
656 result: result.clone(),
657 }
658 });
659 (
660 ExecutionRequest::Finished {
661 result,
662 http_client_traces: None,
663 },
664 child_finished,
665 version,
666 )
667 }
668 };
669 Some(AppendOrCancel::Other(Append {
670 created_at: result_obtained_at,
671 primary_event: AppendRequest {
672 created_at: result_obtained_at,
673 event: primary_event,
674 },
675 execution_id,
676 version,
677 child_finished,
678 }))
679 }
680 })
681 }
682}
683
684#[derive(Debug, Clone)]
685pub(crate) struct ChildFinishedResponse {
686 pub(crate) parent_execution_id: ExecutionId,
687 pub(crate) parent_join_set: JoinSetId,
688 pub(crate) result: SupportedFunctionReturnValue,
689}
690
691#[derive(Debug, Clone)]
692#[expect(clippy::large_enum_variant)]
693pub(crate) enum AppendOrCancel {
694 Cancel {
695 execution_id: ExecutionId,
696 cancelled_at: DateTime<Utc>,
697 },
698 Other(Append),
699}
700
701#[derive(Debug, Clone)]
702pub(crate) struct Append {
703 pub(crate) created_at: DateTime<Utc>,
704 pub(crate) primary_event: AppendRequest,
705 pub(crate) execution_id: ExecutionId,
706 pub(crate) version: Version,
707 pub(crate) child_finished: Option<ChildFinishedResponse>,
708}
709
710impl Append {
711 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
712 if let Some(child_finished) = self.child_finished {
713 assert_matches!(
714 &self.primary_event,
715 AppendRequest {
716 event: ExecutionRequest::Finished { .. },
717 ..
718 }
719 );
720 let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
721 let events = AppendEventsToExecution {
722 execution_id: self.execution_id,
723 version: self.version.clone(),
724 batch: vec![self.primary_event],
725 };
726 let response = AppendResponseToExecution {
727 parent_execution_id: child_finished.parent_execution_id,
728 created_at: self.created_at,
729 join_set_id: child_finished.parent_join_set,
730 child_execution_id,
731 finished_version: self.version, result: child_finished.result,
733 };
734
735 db_exec
736 .append_batch_respond_to_parent(events, response, self.created_at)
737 .await?;
738 } else {
739 db_exec
740 .append(self.execution_id, self.version, self.primary_event)
741 .await?;
742 }
743 Ok(())
744 }
745}
746
747#[cfg(any(test, feature = "test"))]
748pub mod simple_worker {
749 use crate::worker::{Worker, WorkerContext, WorkerResult};
750 use async_trait::async_trait;
751 use concepts::{
752 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
753 storage::{HistoryEvent, Version},
754 };
755 use indexmap::IndexMap;
756 use std::sync::Arc;
757 use tracing::trace;
758
759 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
760 pub type SimpleWorkerResultMap =
761 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
762
763 #[derive(Clone, Debug)]
764 pub struct SimpleWorker {
765 pub worker_results_rev: SimpleWorkerResultMap,
766 pub ffqn: FunctionFqn,
767 exported: [FunctionMetadata; 1],
768 }
769
770 impl SimpleWorker {
771 #[must_use]
772 pub fn with_single_result(res: WorkerResult) -> Self {
773 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
774 Version::new(2),
775 (vec![], res),
776 )]))))
777 }
778
779 #[must_use]
780 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
781 Self {
782 worker_results_rev: self.worker_results_rev,
783 exported: [FunctionMetadata {
784 ffqn: ffqn.clone(),
785 parameter_types: ParameterTypes::default(),
786 return_type: RETURN_TYPE_DUMMY,
787 extension: None,
788 submittable: true,
789 }],
790 ffqn,
791 }
792 }
793
794 #[must_use]
795 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
796 Self {
797 worker_results_rev,
798 ffqn: FFQN_SOME,
799 exported: [FunctionMetadata {
800 ffqn: FFQN_SOME,
801 parameter_types: ParameterTypes::default(),
802 return_type: RETURN_TYPE_DUMMY,
803 extension: None,
804 submittable: true,
805 }],
806 }
807 }
808 }
809
810 #[async_trait]
811 impl Worker for SimpleWorker {
812 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
813 let (expected_version, (expected_eh, worker_result)) =
814 self.worker_results_rev.lock().unwrap().pop().unwrap();
815 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
816 assert_eq!(expected_version, ctx.version);
817 assert_eq!(
818 expected_eh,
819 ctx.event_history
820 .iter()
821 .map(|(event, _version)| event.clone())
822 .collect::<Vec<_>>()
823 );
824 worker_result
825 }
826
827 fn exported_functions(&self) -> &[FunctionMetadata] {
828 &self.exported
829 }
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use self::simple_worker::SimpleWorker;
836 use super::*;
837 use crate::{expired_timers_watcher, worker::WorkerResult};
838 use assert_matches::assert_matches;
839 use async_trait::async_trait;
840 use concepts::storage::{
841 CreateRequest, DbConnection, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
842 };
843 use concepts::storage::{DbPoolCloseable, LockedBy};
844 use concepts::storage::{ExecutionEvent, ExecutionRequest, HistoryEvent, PendingState};
845 use concepts::time::Now;
846 use concepts::{
847 FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
848 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
849 };
850 use db_tests::Database;
851 use indexmap::IndexMap;
852 use rstest::rstest;
853 use simple_worker::FFQN_SOME;
854 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
855 use test_utils::set_up;
856 use test_utils::sim_clock::{ConstClock, SimClock};
857
858 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
859
860 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
861 config: ExecConfig,
862 clock_fn: C,
863 db_exec: Arc<dyn DbExecutor>,
864 worker: Arc<W>,
865 executed_at: DateTime<Utc>,
866 ) -> Vec<ExecutionId> {
867 trace!("Ticking with {worker:?}");
868 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
869 let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
870 executor
871 .tick_test_await(executed_at, RunId::generate())
872 .await
873 }
874
875 #[rstest]
876 #[tokio::test]
877 async fn execute_simple_lifecycle_tick_based_mem(
878 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentId)]
879 locking_strategy: LockingStrategy,
880 ) {
881 let created_at = Now.now();
882 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
883 execute_simple_lifecycle_tick_based(
884 db_pool.connection().as_ref(),
885 db_exec.clone(),
886 ConstClock(created_at),
887 locking_strategy,
888 )
889 .await;
890 db_close.close().await;
891 }
892
893 #[rstest]
894 #[tokio::test]
895 async fn execute_simple_lifecycle_tick_based_sqlite(
896 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentId)]
897 locking_strategy: LockingStrategy,
898 ) {
899 let created_at = Now.now();
900 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
901 execute_simple_lifecycle_tick_based(
902 db_pool.connection().as_ref(),
903 db_exec.clone(),
904 ConstClock(created_at),
905 locking_strategy,
906 )
907 .await;
908 db_close.close().await;
909 }
910
911 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
912 db_connection: &dyn DbConnection,
913 db_exec: Arc<dyn DbExecutor>,
914 clock_fn: C,
915 locking_strategy: LockingStrategy,
916 ) {
917 set_up();
918 let created_at = clock_fn.now();
919 let exec_config = ExecConfig {
920 batch_size: 1,
921 lock_expiry: Duration::from_secs(1),
922 tick_sleep: Duration::from_millis(100),
923 component_id: ComponentId::dummy_activity(),
924 task_limiter: None,
925 executor_id: ExecutorId::generate(),
926 retry_config: ComponentRetryConfig::ZERO,
927 locking_strategy,
928 };
929
930 let execution_log = create_and_tick(
931 CreateAndTickConfig {
932 execution_id: ExecutionId::generate(),
933 created_at,
934 executed_at: created_at,
935 },
936 clock_fn,
937 db_connection,
938 db_exec,
939 exec_config,
940 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
941 SUPPORTED_RETURN_VALUE_OK_EMPTY,
942 Version::new(2),
943 None,
944 ))),
945 tick_fn,
946 )
947 .await;
948 assert_matches!(
949 execution_log.events.get(2).unwrap(),
950 ExecutionEvent {
951 event: ExecutionRequest::Finished {
952 result: SupportedFunctionReturnValue::Ok { ok: None },
953 http_client_traces: None
954 },
955 created_at: _,
956 backtrace_id: None,
957 version: Version(2),
958 }
959 );
960 }
961
962 #[tokio::test]
963 async fn execute_simple_lifecycle_task_based_mem() {
964 set_up();
965 let created_at = Now.now();
966 let clock_fn = ConstClock(created_at);
967 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
968 let exec_config = ExecConfig {
969 batch_size: 1,
970 lock_expiry: Duration::from_secs(1),
971 tick_sleep: Duration::ZERO,
972 component_id: ComponentId::dummy_activity(),
973 task_limiter: None,
974 executor_id: ExecutorId::generate(),
975 retry_config: ComponentRetryConfig::ZERO,
976 locking_strategy: LockingStrategy::default(),
977 };
978
979 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
980 SUPPORTED_RETURN_VALUE_OK_EMPTY,
981 Version::new(2),
982 None,
983 )));
984
985 let execution_log = create_and_tick(
986 CreateAndTickConfig {
987 execution_id: ExecutionId::generate(),
988 created_at,
989 executed_at: created_at,
990 },
991 clock_fn,
992 db_pool.connection().as_ref(),
993 db_exec,
994 exec_config,
995 worker,
996 tick_fn,
997 )
998 .await;
999 assert_matches!(
1000 execution_log.events.get(2).unwrap(),
1001 ExecutionEvent {
1002 event: ExecutionRequest::Finished {
1003 result: SupportedFunctionReturnValue::Ok { ok: None },
1004 http_client_traces: None
1005 },
1006 created_at: _,
1007 backtrace_id: None,
1008 version: Version(2),
1009 }
1010 );
1011 db_close.close().await;
1012 }
1013
1014 struct CreateAndTickConfig {
1015 execution_id: ExecutionId,
1016 created_at: DateTime<Utc>,
1017 executed_at: DateTime<Utc>,
1018 }
1019
1020 async fn create_and_tick<
1021 W: Worker,
1022 C: ClockFn,
1023 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
1024 F: Future<Output = Vec<ExecutionId>>,
1025 >(
1026 config: CreateAndTickConfig,
1027 clock_fn: C,
1028 db_connection: &dyn DbConnection,
1029 db_exec: Arc<dyn DbExecutor>,
1030 exec_config: ExecConfig,
1031 worker: Arc<W>,
1032 mut tick: T,
1033 ) -> ExecutionLog {
1034 db_connection
1036 .create(CreateRequest {
1037 created_at: config.created_at,
1038 execution_id: config.execution_id.clone(),
1039 ffqn: FFQN_SOME,
1040 params: Params::empty(),
1041 parent: None,
1042 metadata: concepts::ExecutionMetadata::empty(),
1043 scheduled_at: config.created_at,
1044 component_id: ComponentId::dummy_activity(),
1045 scheduled_by: None,
1046 })
1047 .await
1048 .unwrap();
1049 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
1051 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
1052 debug!("Execution history after tick: {execution_log:?}");
1053 assert_matches!(
1055 execution_log.events.first().unwrap(),
1056 ExecutionEvent {
1057 event: ExecutionRequest::Created { .. },
1058 created_at: actually_created_at,
1059 backtrace_id: None,
1060 version: Version(0),
1061 }
1062 if config.created_at == *actually_created_at
1063 );
1064 let locked_at = assert_matches!(
1065 execution_log.events.get(1).unwrap(),
1066 ExecutionEvent {
1067 event: ExecutionRequest::Locked { .. },
1068 created_at: locked_at,
1069 backtrace_id: None,
1070 version: Version(1),
1071 } if config.created_at <= *locked_at
1072 => *locked_at
1073 );
1074 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
1075 event: _,
1076 created_at: executed_at,
1077 backtrace_id: None,
1078 version: Version(2),
1079 } if *executed_at >= locked_at);
1080 execution_log
1081 }
1082
1083 #[tokio::test]
1084 async fn activity_trap_should_trigger_an_execution_retry() {
1085 set_up();
1086 let sim_clock = SimClock::default();
1087 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1088 let retry_exp_backoff = Duration::from_millis(100);
1089 let retry_config = ComponentRetryConfig {
1090 max_retries: 1,
1091 retry_exp_backoff,
1092 };
1093 let exec_config = ExecConfig {
1094 batch_size: 1,
1095 lock_expiry: Duration::from_secs(1),
1096 tick_sleep: Duration::ZERO,
1097 component_id: ComponentId::dummy_activity(),
1098 task_limiter: None,
1099 executor_id: ExecutorId::generate(),
1100 retry_config,
1101 locking_strategy: LockingStrategy::default(),
1102 };
1103 let expected_reason = "error reason";
1104 let expected_detail = "error detail";
1105 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1106 WorkerError::ActivityTrap {
1107 reason: expected_reason.to_string(),
1108 trap_kind: concepts::TrapKind::Trap,
1109 detail: Some(expected_detail.to_string()),
1110 version: Version::new(2),
1111 http_client_traces: None,
1112 },
1113 )));
1114 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1115 let execution_log = create_and_tick(
1116 CreateAndTickConfig {
1117 execution_id: ExecutionId::generate(),
1118 created_at: sim_clock.now(),
1119 executed_at: sim_clock.now(),
1120 },
1121 sim_clock.clone(),
1122 db_pool.connection().as_ref(),
1123 db_exec.clone(),
1124 exec_config.clone(),
1125 worker,
1126 tick_fn,
1127 )
1128 .await;
1129 assert_eq!(3, execution_log.events.len());
1130 {
1131 let (reason, detail, at, expires_at) = assert_matches!(
1132 &execution_log.events.get(2).unwrap(),
1133 ExecutionEvent {
1134 event: ExecutionRequest::TemporarilyFailed {
1135 reason,
1136 detail,
1137 backoff_expires_at,
1138 http_client_traces: None,
1139 },
1140 created_at: at,
1141 backtrace_id: None,
1142 version: Version(2),
1143 }
1144 => (reason, detail, *at, *backoff_expires_at)
1145 );
1146 assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1147 assert_eq!(Some(expected_detail), detail.as_deref());
1148 assert_eq!(at, sim_clock.now());
1149 assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1150 }
1151 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1152 std::sync::Mutex::new(IndexMap::from([(
1153 Version::new(4),
1154 (
1155 vec![],
1156 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1157 ),
1158 )])),
1159 )));
1160 assert!(
1162 tick_fn(
1163 exec_config.clone(),
1164 sim_clock.clone(),
1165 db_exec.clone(),
1166 worker.clone(),
1167 sim_clock.now(),
1168 )
1169 .await
1170 .is_empty()
1171 );
1172 sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1174 tick_fn(
1175 exec_config,
1176 sim_clock.clone(),
1177 db_exec.clone(),
1178 worker,
1179 sim_clock.now(),
1180 )
1181 .await;
1182 let execution_log = {
1183 let db_connection = db_pool.connection();
1184 db_connection
1185 .get(&execution_log.execution_id)
1186 .await
1187 .unwrap()
1188 };
1189 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1190 assert_matches!(
1191 execution_log.events.get(3).unwrap(),
1192 ExecutionEvent {
1193 event: ExecutionRequest::Locked { .. },
1194 created_at: at,
1195 backtrace_id: None,
1196 version: Version(3),
1197 } if *at == sim_clock.now()
1198 );
1199 assert_matches!(
1200 execution_log.events.get(4).unwrap(),
1201 ExecutionEvent {
1202 event: ExecutionRequest::Finished {
1203 result: SupportedFunctionReturnValue::Ok{ok:None},
1204 http_client_traces: None
1205 },
1206 created_at: finished_at,
1207 backtrace_id: None,
1208 version: Version(4),
1209 } if *finished_at == sim_clock.now()
1210 );
1211 db_close.close().await;
1212 }
1213
1214 #[tokio::test]
1215 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1216 set_up();
1217 let created_at = Now.now();
1218 let clock_fn = ConstClock(created_at);
1219 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1220 let exec_config = ExecConfig {
1221 batch_size: 1,
1222 lock_expiry: Duration::from_secs(1),
1223 tick_sleep: Duration::ZERO,
1224 component_id: ComponentId::dummy_activity(),
1225 task_limiter: None,
1226 executor_id: ExecutorId::generate(),
1227 retry_config: ComponentRetryConfig::ZERO,
1228 locking_strategy: LockingStrategy::default(),
1229 };
1230
1231 let reason = "error reason";
1232 let expected_reason = format!("activity trap: {reason}");
1233 let expected_detail = "error detail";
1234 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1235 WorkerError::ActivityTrap {
1236 reason: reason.to_string(),
1237 trap_kind: concepts::TrapKind::Trap,
1238 detail: Some(expected_detail.to_string()),
1239 version: Version::new(2),
1240 http_client_traces: None,
1241 },
1242 )));
1243 let execution_log = create_and_tick(
1244 CreateAndTickConfig {
1245 execution_id: ExecutionId::generate(),
1246 created_at,
1247 executed_at: created_at,
1248 },
1249 clock_fn,
1250 db_pool.connection().as_ref(),
1251 db_exec,
1252 exec_config.clone(),
1253 worker,
1254 tick_fn,
1255 )
1256 .await;
1257 assert_eq!(3, execution_log.events.len());
1258 let (reason, kind, detail) = assert_matches!(
1259 &execution_log.events.get(2).unwrap(),
1260 ExecutionEvent {
1261 event: ExecutionRequest::Finished{
1262 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1263 http_client_traces: None
1264 },
1265 created_at: at,
1266 backtrace_id: None,
1267 version: Version(2),
1268 } if *at == created_at
1269 => (reason, kind, detail)
1270 );
1271
1272 assert_eq!(Some(expected_reason), *reason);
1273 assert_eq!(Some(expected_detail), detail.as_deref());
1274 assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1275
1276 db_close.close().await;
1277 }
1278
1279 #[tokio::test]
1280 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1281 let worker_error = WorkerError::ActivityTrap {
1282 reason: "error reason".to_string(),
1283 trap_kind: TrapKind::Trap,
1284 detail: Some("detail".to_string()),
1285 version: Version::new(2),
1286 http_client_traces: None,
1287 };
1288 let expected_child_err = FinishedExecutionError {
1289 kind: ExecutionFailureKind::Uncategorized,
1290 reason: Some("activity trap: error reason".to_string()),
1291 detail: Some("detail".to_string()),
1292 };
1293 child_execution_permanently_failed_should_notify_parent(
1294 WorkerResult::Err(worker_error),
1295 expected_child_err,
1296 )
1297 .await;
1298 }
1299
1300 #[tokio::test]
1301 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1302 let expected_child_err = FinishedExecutionError {
1303 kind: ExecutionFailureKind::TimedOut,
1304 reason: None,
1305 detail: None,
1306 };
1307 child_execution_permanently_failed_should_notify_parent(
1308 WorkerResult::DbUpdatedByWorkerOrWatcher,
1309 expected_child_err,
1310 )
1311 .await;
1312 }
1313
1314 async fn child_execution_permanently_failed_should_notify_parent(
1315 worker_result: WorkerResult,
1316 expected_child_err: FinishedExecutionError,
1317 ) {
1318 use concepts::storage::JoinSetResponseEventOuter;
1319 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1320
1321 set_up();
1322 let sim_clock = SimClock::default();
1323 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1324
1325 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1326 WorkerResult::DbUpdatedByWorkerOrWatcher,
1327 ));
1328 let parent_execution_id = ExecutionId::generate();
1329 db_pool
1330 .connection()
1331 .create(CreateRequest {
1332 created_at: sim_clock.now(),
1333 execution_id: parent_execution_id.clone(),
1334 ffqn: FFQN_SOME,
1335 params: Params::empty(),
1336 parent: None,
1337 metadata: concepts::ExecutionMetadata::empty(),
1338 scheduled_at: sim_clock.now(),
1339 component_id: ComponentId::dummy_activity(),
1340 scheduled_by: None,
1341 })
1342 .await
1343 .unwrap();
1344 let parent_executor_id = ExecutorId::generate();
1345 tick_fn(
1346 ExecConfig {
1347 batch_size: 1,
1348 lock_expiry: LOCK_EXPIRY,
1349 tick_sleep: Duration::ZERO,
1350 component_id: ComponentId::dummy_activity(),
1351 task_limiter: None,
1352 executor_id: parent_executor_id,
1353 retry_config: ComponentRetryConfig::ZERO,
1354 locking_strategy: LockingStrategy::default(),
1355 },
1356 sim_clock.clone(),
1357 db_exec.clone(),
1358 parent_worker,
1359 sim_clock.now(),
1360 )
1361 .await;
1362
1363 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1364 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1365 {
1367 let params = Params::empty();
1368 let child = CreateRequest {
1369 created_at: sim_clock.now(),
1370 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1371 ffqn: FFQN_CHILD,
1372 params: params.clone(),
1373 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1374 metadata: concepts::ExecutionMetadata::empty(),
1375 scheduled_at: sim_clock.now(),
1376 component_id: ComponentId::dummy_activity(),
1377 scheduled_by: None,
1378 };
1379 let current_time = sim_clock.now();
1380 let join_set = AppendRequest {
1381 created_at: current_time,
1382 event: ExecutionRequest::HistoryEvent {
1383 event: HistoryEvent::JoinSetCreate {
1384 join_set_id: join_set_id.clone(),
1385 },
1386 },
1387 };
1388 let child_exec_req = AppendRequest {
1389 created_at: current_time,
1390 event: ExecutionRequest::HistoryEvent {
1391 event: HistoryEvent::JoinSetRequest {
1392 join_set_id: join_set_id.clone(),
1393 request: JoinSetRequest::ChildExecutionRequest {
1394 child_execution_id: child_execution_id.clone(),
1395 target_ffqn: FFQN_CHILD,
1396 params,
1397 },
1398 },
1399 },
1400 };
1401 let join_next = AppendRequest {
1402 created_at: current_time,
1403 event: ExecutionRequest::HistoryEvent {
1404 event: HistoryEvent::JoinNext {
1405 join_set_id: join_set_id.clone(),
1406 run_expires_at: sim_clock.now(),
1407 closing: false,
1408 requested_ffqn: Some(FFQN_CHILD),
1409 },
1410 },
1411 };
1412 db_pool
1413 .connection()
1414 .append_batch_create_new_execution(
1415 current_time,
1416 vec![join_set, child_exec_req, join_next],
1417 parent_execution_id.clone(),
1418 Version::new(2),
1419 vec![child],
1420 )
1421 .await
1422 .unwrap();
1423 }
1424
1425 let child_worker =
1426 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1427
1428 tick_fn(
1430 ExecConfig {
1431 batch_size: 1,
1432 lock_expiry: LOCK_EXPIRY,
1433 tick_sleep: Duration::ZERO,
1434 component_id: ComponentId::dummy_activity(),
1435 task_limiter: None,
1436 executor_id: ExecutorId::generate(),
1437 retry_config: ComponentRetryConfig::ZERO,
1438 locking_strategy: LockingStrategy::default(),
1439 },
1440 sim_clock.clone(),
1441 db_exec.clone(),
1442 child_worker,
1443 sim_clock.now(),
1444 )
1445 .await;
1446 if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1447 sim_clock.move_time_forward(LOCK_EXPIRY);
1449 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1450 .await
1451 .unwrap();
1452 }
1453 let child_log = db_pool
1454 .connection()
1455 .get(&ExecutionId::Derived(child_execution_id.clone()))
1456 .await
1457 .unwrap();
1458 assert!(child_log.pending_state.is_finished());
1459 assert_eq!(
1460 Version(2),
1461 child_log.next_version,
1462 "created = 0, locked = 1, with_single_result = 2"
1463 );
1464 assert_eq!(
1465 ExecutionRequest::Finished {
1466 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1467 http_client_traces: None
1468 },
1469 child_log.last_event().event
1470 );
1471 let parent_log = db_pool
1472 .connection()
1473 .get(&parent_execution_id)
1474 .await
1475 .unwrap();
1476 assert_matches!(
1477 parent_log.pending_state,
1478 PendingState::PendingAt {
1479 scheduled_at,
1480 last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1481 component_id_input_digest: _
1482 } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1483 "parent should be back to pending"
1484 );
1485 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1486 parent_log.responses.last(),
1487 Some(JoinSetResponseEventOuter{
1488 created_at: at,
1489 event: JoinSetResponseEvent{
1490 join_set_id: found_join_set_id,
1491 event: JoinSetResponse::ChildExecutionFinished {
1492 child_execution_id: found_child_execution_id,
1493 finished_version,
1494 result: found_result,
1495 }
1496 }
1497 })
1498 if *at == sim_clock.now()
1499 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1500 );
1501 assert_eq!(join_set_id, *found_join_set_id);
1502 assert_eq!(child_execution_id, *found_child_execution_id);
1503 assert_eq!(child_log.next_version, *child_finished_version);
1504 assert_matches!(
1505 found_result,
1506 SupportedFunctionReturnValue::ExecutionError(_)
1507 );
1508
1509 db_close.close().await;
1510 }
1511
1512 #[derive(Clone, Debug)]
1513 struct SleepyWorker {
1514 duration: Duration,
1515 result: SupportedFunctionReturnValue,
1516 exported: [FunctionMetadata; 1],
1517 }
1518
1519 #[async_trait]
1520 impl Worker for SleepyWorker {
1521 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1522 tokio::time::sleep(self.duration).await;
1523 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1524 }
1525
1526 fn exported_functions(&self) -> &[FunctionMetadata] {
1527 &self.exported
1528 }
1529 }
1530
1531 #[tokio::test]
1532 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1533 set_up();
1534 let sim_clock = SimClock::default();
1535 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1536 let lock_expiry = Duration::from_millis(100);
1537 let timeout_duration = Duration::from_millis(300);
1538 let retry_config = ComponentRetryConfig {
1539 max_retries: 1,
1540 retry_exp_backoff: timeout_duration,
1541 };
1542 let exec_config = ExecConfig {
1543 batch_size: 1,
1544 lock_expiry,
1545 tick_sleep: Duration::ZERO,
1546 component_id: ComponentId::dummy_activity(),
1547 task_limiter: None,
1548 executor_id: ExecutorId::generate(),
1549 retry_config,
1550 locking_strategy: LockingStrategy::default(),
1551 };
1552
1553 let worker = Arc::new(SleepyWorker {
1554 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1556 exported: [FunctionMetadata {
1557 ffqn: FFQN_SOME,
1558 parameter_types: ParameterTypes::default(),
1559 return_type: RETURN_TYPE_DUMMY,
1560 extension: None,
1561 submittable: true,
1562 }],
1563 });
1564 let execution_id = ExecutionId::generate();
1566 let db_connection = db_pool.connection();
1567 db_connection
1568 .create(CreateRequest {
1569 created_at: sim_clock.now(),
1570 execution_id: execution_id.clone(),
1571 ffqn: FFQN_SOME,
1572 params: Params::empty(),
1573 parent: None,
1574 metadata: concepts::ExecutionMetadata::empty(),
1575 scheduled_at: sim_clock.now(),
1576 component_id: ComponentId::dummy_activity(),
1577 scheduled_by: None,
1578 })
1579 .await
1580 .unwrap();
1581
1582 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1583 let executor = ExecTask::new_test(
1584 worker,
1585 exec_config.clone(),
1586 sim_clock.clone(),
1587 db_exec.clone(),
1588 ffqns,
1589 );
1590 let mut first_execution_progress = executor
1591 .tick(sim_clock.now(), RunId::generate())
1592 .await
1593 .unwrap();
1594 assert_eq!(1, first_execution_progress.executions.len());
1595 sim_clock.move_time_forward(lock_expiry);
1597 let now_after_first_lock_expiry = sim_clock.now();
1599 {
1600 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1601 let cleanup_progress = executor
1602 .tick(now_after_first_lock_expiry, RunId::generate())
1603 .await
1604 .unwrap();
1605 assert!(cleanup_progress.executions.is_empty());
1606 }
1607 {
1608 let expired_locks = expired_timers_watcher::tick(
1609 db_pool.connection().as_ref(),
1610 now_after_first_lock_expiry,
1611 )
1612 .await
1613 .unwrap()
1614 .expired_locks;
1615 assert_eq!(1, expired_locks);
1616 }
1617 assert!(
1618 !first_execution_progress
1619 .executions
1620 .pop()
1621 .unwrap()
1622 .1
1623 .is_finished()
1624 );
1625
1626 let execution_log = db_connection.get(&execution_id).await.unwrap();
1627 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1628 assert_matches!(
1629 &execution_log.events.get(2).unwrap(),
1630 ExecutionEvent {
1631 event: ExecutionRequest::TemporarilyTimedOut { backoff_expires_at, .. },
1632 created_at: at,
1633 backtrace_id: None,
1634 version: Version(2),
1635 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1636 );
1637 assert_matches!(
1638 execution_log.pending_state,
1639 PendingState::PendingAt {
1640 scheduled_at: found_scheduled_by,
1641 last_lock: Some(LockedBy {
1642 executor_id: found_executor_id,
1643 run_id: _,
1644 }),
1645 component_id_input_digest: _
1646 } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1647 );
1648 sim_clock.move_time_forward(timeout_duration);
1649 let now_after_first_timeout = sim_clock.now();
1650 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1651
1652 let mut second_execution_progress = executor
1653 .tick(now_after_first_timeout, RunId::generate())
1654 .await
1655 .unwrap();
1656 assert_eq!(1, second_execution_progress.executions.len());
1657
1658 sim_clock.move_time_forward(lock_expiry);
1660 let now_after_second_lock_expiry = sim_clock.now();
1662 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1663 {
1664 let cleanup_progress = executor
1665 .tick(now_after_second_lock_expiry, RunId::generate())
1666 .await
1667 .unwrap();
1668 assert!(cleanup_progress.executions.is_empty());
1669 }
1670 {
1671 let expired_locks = expired_timers_watcher::tick(
1672 db_pool.connection().as_ref(),
1673 now_after_second_lock_expiry,
1674 )
1675 .await
1676 .unwrap()
1677 .expired_locks;
1678 assert_eq!(1, expired_locks);
1679 }
1680 assert!(
1681 !second_execution_progress
1682 .executions
1683 .pop()
1684 .unwrap()
1685 .1
1686 .is_finished()
1687 );
1688
1689 drop(db_connection);
1690 drop(executor);
1691 db_close.close().await;
1692 }
1693}