1use crate::worker::{FatalError, Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, DbPool, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11 ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16 FinishedExecutionError,
17 storage::{ExecutionRequest, Version},
18};
19use std::{
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31 pub lock_expiry: Duration,
32 pub tick_sleep: Duration,
33 pub batch_size: u32,
34 pub component_id: ComponentId,
35 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36 pub executor_id: ExecutorId,
37 pub retry_config: ComponentRetryConfig,
38 pub locking_strategy: LockingStrategy,
39}
40
41pub struct ExecTask<C: ClockFn> {
42 worker: Arc<dyn Worker>,
43 config: ExecConfig,
44 clock_fn: C, db_pool: Arc<dyn DbPool>,
46 locking_strategy_holder: LockingStrategyHolder,
47}
48
49#[derive(derive_more::Debug, Default)]
50pub struct ExecutionProgress {
51 #[debug(skip)]
52 #[allow(dead_code)]
53 executions: Vec<(ExecutionId, JoinHandle<()>)>,
54}
55
56impl ExecutionProgress {
57 #[cfg(feature = "test")]
58 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
59 let mut vec = Vec::new();
60 for (exe, join_handle) in self.executions {
61 vec.push(exe);
62 join_handle.await.unwrap();
63 }
64 vec
65 }
66}
67
68#[derive(derive_more::Debug)]
69pub struct ExecutorTaskHandle {
70 #[debug(skip)]
71 is_closing: Arc<AtomicBool>,
72 #[debug(skip)]
73 abort_handle: AbortHandle,
74 component_id: ComponentId,
75 executor_id: ExecutorId,
76}
77
78impl ExecutorTaskHandle {
79 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
80 pub async fn close(&self) {
81 trace!("Gracefully closing");
82 self.is_closing.store(true, Ordering::Relaxed);
90 while !self.abort_handle.is_finished() {
91 tokio::time::sleep(Duration::from_millis(1)).await;
92 }
93 debug!("Gracefully closed");
94 }
95}
96
97impl Drop for ExecutorTaskHandle {
98 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
99 fn drop(&mut self) {
100 if self.abort_handle.is_finished() {
101 return;
102 }
103 warn!("Aborting the executor task");
104 self.abort_handle.abort();
105 }
106}
107
108#[cfg(feature = "test")]
109pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
110 extract_exported_ffqns_noext(worker)
111}
112
113fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
114 worker
115 .exported_functions()
116 .iter()
117 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
118 .collect::<Arc<_>>()
119}
120
121#[derive(Debug, Default, Clone, Copy)]
122pub enum LockingStrategy {
123 #[default]
124 ByFfqns,
125 ByComponentId,
126}
127impl LockingStrategy {
128 fn holder(&self, ffqns: Arc<[FunctionFqn]>) -> LockingStrategyHolder {
129 match self {
130 LockingStrategy::ByFfqns => LockingStrategyHolder::ByFfqns(ffqns),
131 LockingStrategy::ByComponentId => LockingStrategyHolder::ByComponentId,
132 }
133 }
134}
135
136enum LockingStrategyHolder {
137 ByFfqns(Arc<[FunctionFqn]>),
138 ByComponentId,
139}
140
141impl<C: ClockFn + 'static> ExecTask<C> {
142 #[cfg(feature = "test")]
143 pub fn new_test(
144 worker: Arc<dyn Worker>,
145 config: ExecConfig,
146 clock_fn: C,
147 db_pool: Arc<dyn DbPool>,
148 ffqns: Arc<[FunctionFqn]>,
149 ) -> Self {
150 Self {
151 worker,
152 locking_strategy_holder: config.locking_strategy.holder(ffqns),
153 config,
154 clock_fn,
155 db_pool,
156 }
157 }
158
159 #[cfg(feature = "test")]
160 pub fn new_all_ffqns_test(
161 worker: Arc<dyn Worker>,
162 config: ExecConfig,
163 clock_fn: C,
164 db_pool: Arc<dyn DbPool>,
165 ) -> Self {
166 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
167 Self {
168 worker,
169 locking_strategy_holder: config.locking_strategy.holder(ffqns),
170 config,
171 clock_fn,
172 db_pool,
173 }
174 }
175
176 pub fn spawn_new(
177 worker: Arc<dyn Worker>,
178 config: ExecConfig,
179 clock_fn: C,
180 db_pool: Arc<dyn DbPool>,
181 sleep: impl Sleep + 'static,
182 ) -> ExecutorTaskHandle {
183 let is_closing = Arc::new(AtomicBool::default());
184 let is_closing_inner = is_closing.clone();
185 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
186 let component_id = config.component_id.clone();
187 let executor_id = config.executor_id;
188 let abort_handle = tokio::spawn(async move {
189 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
190 let lock_strategy_holder = config.locking_strategy.holder(ffqns);
191 let task = ExecTask {
192 worker,
193 config,
194 db_pool,
195 locking_strategy_holder: lock_strategy_holder,
196 clock_fn: clock_fn.clone(),
197 };
198 let mut old_err = None;
199 while !is_closing_inner.load(Ordering::Relaxed) {
200 let res = task.db_pool.db_exec_conn().await;
201 let res = log_err_if_new(res, &mut old_err);
202 if let Ok(db_exec) = res {
203 let _ = task.tick(db_exec.as_ref(), clock_fn.now(), RunId::generate()).await;
204 db_exec
205 .wait_for_pending_by_component_id(clock_fn.now(), &task.config.component_id, {
206 let sleep = sleep.clone();
207 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
208 .await;
209 } else {
210 sleep.sleep(task.config.tick_sleep).await;
211 }
212 }
213 })
214 .abort_handle();
215 ExecutorTaskHandle {
216 is_closing,
217 abort_handle,
218 component_id,
219 executor_id,
220 }
221 }
222
223 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
224 if let Some(task_limiter) = &self.config.task_limiter {
225 let mut locks = Vec::new();
226 for _ in 0..self.config.batch_size {
227 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
228 locks.push(Some(permit));
229 } else {
230 break;
231 }
232 }
233 locks
234 } else {
235 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
236 for _ in 0..self.config.batch_size {
237 vec.push(None);
238 }
239 vec
240 }
241 }
242
243 #[cfg(feature = "test")]
244 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
245 let db_exec = self.db_pool.db_exec_conn().await.unwrap();
246 self.tick(db_exec.as_ref(), executed_at, run_id)
247 .await
248 .unwrap()
249 }
250
251 #[cfg(feature = "test")]
252 pub async fn tick_test_await(
253 &self,
254 executed_at: DateTime<Utc>,
255 run_id: RunId,
256 ) -> Vec<ExecutionId> {
257 let db_exec = self.db_pool.db_exec_conn().await.unwrap();
258 self.tick(db_exec.as_ref(), executed_at, run_id)
259 .await
260 .unwrap()
261 .wait_for_tasks()
262 .await
263 }
264
265 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
266 async fn tick(
267 &self,
268 db_exec: &dyn DbExecutor,
269 executed_at: DateTime<Utc>,
270 run_id: RunId,
271 ) -> Result<ExecutionProgress, DbErrorGeneric> {
272 let locked_executions = {
273 let mut permits = self.acquire_task_permits();
274 if permits.is_empty() {
275 return Ok(ExecutionProgress::default());
276 }
277 let lock_expires_at = executed_at + self.config.lock_expiry;
278 let batch_size = u32::try_from(permits.len()).expect("ExecConfig.batch_size is u32");
279 let locked_executions = match &self.locking_strategy_holder {
280 LockingStrategyHolder::ByFfqns(ffqns) => {
281 db_exec
282 .lock_pending_by_ffqns(
283 batch_size,
284 executed_at, ffqns.clone(),
286 executed_at, self.config.component_id.clone(),
288 self.config.executor_id,
289 lock_expires_at,
290 run_id,
291 self.config.retry_config,
292 )
293 .await?
294 }
295 LockingStrategyHolder::ByComponentId => {
296 db_exec
297 .lock_pending_by_component_id(
298 batch_size,
299 executed_at, &self.config.component_id,
301 executed_at, self.config.executor_id,
303 lock_expires_at,
304 run_id,
305 self.config.retry_config,
306 )
307 .await?
308 }
309 };
310 while permits.len() > locked_executions.len() {
312 permits.pop();
313 }
314 assert_eq!(permits.len(), locked_executions.len());
315 locked_executions.into_iter().zip(permits)
316 };
317
318 let mut executions = Vec::with_capacity(locked_executions.len());
319 for (locked_execution, permit) in locked_executions {
320 let execution_id = locked_execution.execution_id.clone();
321 let join_handle = {
322 let worker = self.worker.clone();
323 let db_pool = self.db_pool.clone();
324 let clock_fn = self.clock_fn.clone();
325 let worker_span = info_span!(parent: None, "worker",
326 "otel.name" = format!("worker {}", locked_execution.ffqn),
327 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
328 locked_execution.metadata.enrich(&worker_span);
329 tokio::spawn({
330 let worker_span2 = worker_span.clone();
331 let retry_config = self.config.retry_config;
332 async move {
333 let _permit = permit;
334 let res = Self::run_worker(
335 worker,
336 db_pool.as_ref(),
337 clock_fn,
338 locked_execution,
339 retry_config,
340 worker_span2,
341 )
342 .await;
343 if let Err(db_error) = res {
344 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
345 }
346 }
347 .instrument(worker_span)
348 })
349 };
350 executions.push((execution_id, join_handle));
351 }
352 Ok(ExecutionProgress { executions })
353 }
354
355 async fn run_worker(
356 worker: Arc<dyn Worker>,
357 db_pool: &dyn DbPool,
358 clock_fn: C,
359 locked_execution: LockedExecution,
360 retry_config: ComponentRetryConfig,
361 worker_span: Span,
362 ) -> Result<(), DbErrorWrite> {
363 debug!("Worker::run starting");
364 trace!(
365 version = %locked_execution.next_version,
366 params = ?locked_execution.params,
367 event_history = ?locked_execution.event_history,
368 "Worker::run starting"
369 );
370 let can_be_retried = ExecutionLog::can_be_retried_after(
371 locked_execution.intermittent_event_count + 1,
372 retry_config.max_retries,
373 retry_config.retry_exp_backoff,
374 );
375 let unlock_expiry_on_limit_reached =
376 ExecutionLog::compute_retry_duration_when_retrying_forever(
377 locked_execution.intermittent_event_count + 1,
378 retry_config.retry_exp_backoff,
379 );
380 let ctx = WorkerContext {
381 execution_id: locked_execution.execution_id.clone(),
382 metadata: locked_execution.metadata,
383 ffqn: locked_execution.ffqn,
384 params: locked_execution.params,
385 event_history: locked_execution.event_history,
386 responses: locked_execution
387 .responses
388 .into_iter()
389 .map(|outer| outer.event)
390 .collect(),
391 version: locked_execution.next_version,
392 can_be_retried: can_be_retried.is_some(),
393 locked_event: locked_execution.locked_event,
394 worker_span,
395 };
396 let worker_result = worker.run(ctx).await;
397 trace!(?worker_result, "Worker::run finished");
398 let result_obtained_at = clock_fn.now();
399 match Self::worker_result_to_execution_event(
400 locked_execution.execution_id,
401 worker_result,
402 result_obtained_at,
403 locked_execution.parent,
404 can_be_retried,
405 unlock_expiry_on_limit_reached,
406 )? {
407 Some(append) => {
408 trace!("Appending {append:?}");
409 let db_exec = db_pool.db_exec_conn().await?;
410 match append {
411 AppendOrCancel::Cancel {
412 execution_id,
413 cancelled_at,
414 } => db_exec
415 .cancel_activity_with_retries(&execution_id, cancelled_at)
416 .await
417 .map(|_| ()),
418 AppendOrCancel::Other(append) => append.append(db_exec.as_ref()).await,
419 }
420 }
421 None => Ok(()),
422 }
423 }
424
425 fn worker_result_to_execution_event(
427 execution_id: ExecutionId,
428 worker_result: WorkerResult,
429 result_obtained_at: DateTime<Utc>,
430 parent: Option<(ExecutionId, JoinSetId)>,
431 can_be_retried: Option<Duration>,
432 unlock_expiry_on_limit_reached: Duration,
433 ) -> Result<Option<AppendOrCancel>, DbErrorWrite> {
434 Ok(match worker_result {
435 WorkerResult::Ok(result, new_version, http_client_traces) => {
436 info!("Execution finished: {result}");
437 let child_finished =
438 parent.map(
439 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
440 parent_execution_id,
441 parent_join_set,
442 result: result.clone(),
443 },
444 );
445 let primary_event = AppendRequest {
446 created_at: result_obtained_at,
447 event: ExecutionRequest::Finished {
448 result,
449 http_client_traces,
450 },
451 };
452
453 Some(AppendOrCancel::Other(Append {
454 created_at: result_obtained_at,
455 primary_event,
456 execution_id,
457 version: new_version,
458 child_finished,
459 }))
460 }
461 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
462 WorkerResult::Err(err) => {
463 let reason_generic = err.to_string(); let (primary_event, child_finished, version) = match err {
466 WorkerError::TemporaryTimeout {
467 http_client_traces,
468 version,
469 } => {
470 if let Some(duration) = can_be_retried {
471 let backoff_expires_at = result_obtained_at + duration;
472 info!(
473 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
474 );
475 (
476 ExecutionRequest::TemporarilyTimedOut {
477 backoff_expires_at,
478 http_client_traces,
479 },
480 None,
481 version,
482 )
483 } else {
484 info!("Execution timed out");
485 let result = SupportedFunctionReturnValue::ExecutionError(
486 FinishedExecutionError {
487 kind: ExecutionFailureKind::TimedOut,
488 reason: None,
489 detail: None,
490 },
491 );
492 let child_finished =
493 parent.map(|(parent_execution_id, parent_join_set)| {
494 ChildFinishedResponse {
495 parent_execution_id,
496 parent_join_set,
497 result: result.clone(),
498 }
499 });
500 (
501 ExecutionRequest::Finished {
502 result,
503 http_client_traces,
504 },
505 child_finished,
506 version,
507 )
508 }
509 }
510 WorkerError::DbError(db_error) => {
511 return Err(db_error);
512 }
513 WorkerError::ActivityTrap {
514 reason: _, trap_kind,
516 detail,
517 version,
518 http_client_traces,
519 } => {
520 if let Some(duration) = can_be_retried {
521 let expires_at = result_obtained_at + duration;
522 debug!(
523 "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
524 );
525 (
526 ExecutionRequest::TemporarilyFailed {
527 reason: StrVariant::from(reason_generic),
528 backoff_expires_at: expires_at,
529 detail,
530 http_client_traces,
531 },
532 None,
533 version,
534 )
535 } else {
536 info!(
537 "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
538 );
539 let result = SupportedFunctionReturnValue::ExecutionError(
540 FinishedExecutionError {
541 reason: Some(reason_generic),
542 kind: ExecutionFailureKind::Uncategorized,
543 detail,
544 },
545 );
546 let child_finished =
547 parent.map(|(parent_execution_id, parent_join_set)| {
548 ChildFinishedResponse {
549 parent_execution_id,
550 parent_join_set,
551 result: result.clone(),
552 }
553 });
554 (
555 ExecutionRequest::Finished {
556 result,
557 http_client_traces,
558 },
559 child_finished,
560 version,
561 )
562 }
563 }
564 WorkerError::ActivityPreopenedDirError {
565 reason,
566 detail,
567 version,
568 } => {
569 let http_client_traces = None;
570 if let Some(duration) = can_be_retried {
571 let expires_at = result_obtained_at + duration;
572 debug!(
573 "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
574 );
575 (
576 ExecutionRequest::TemporarilyFailed {
577 reason: StrVariant::from(reason_generic),
578 backoff_expires_at: expires_at,
579 detail: Some(detail),
580 http_client_traces,
581 },
582 None,
583 version,
584 )
585 } else {
586 info!(
587 "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
588 );
589 let result = SupportedFunctionReturnValue::ExecutionError(
590 FinishedExecutionError {
591 reason: Some(reason_generic),
592 kind: ExecutionFailureKind::Uncategorized,
593 detail: Some(detail),
594 },
595 );
596 let child_finished =
597 parent.map(|(parent_execution_id, parent_join_set)| {
598 ChildFinishedResponse {
599 parent_execution_id,
600 parent_join_set,
601 result: result.clone(),
602 }
603 });
604 (
605 ExecutionRequest::Finished {
606 result,
607 http_client_traces,
608 },
609 child_finished,
610 version,
611 )
612 }
613 }
614 WorkerError::ActivityReturnedError {
615 detail,
616 version,
617 http_client_traces,
618 } => {
619 let duration = can_be_retried.expect(
620 "ActivityReturnedError must not be returned when retries are exhausted",
621 );
622 let expires_at = result_obtained_at + duration;
623 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
624 (
625 ExecutionRequest::TemporarilyFailed {
626 backoff_expires_at: expires_at,
627 reason: StrVariant::Static("activity returned error"), detail, http_client_traces,
630 },
631 None,
632 version,
633 )
634 }
635 WorkerError::LimitReached {
636 reason,
637 version: new_version,
638 } => {
639 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
640 warn!(
641 "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
642 );
643 (
644 ExecutionRequest::Unlocked {
645 backoff_expires_at: expires_at,
646 reason: StrVariant::from(reason),
647 },
648 None,
649 new_version,
650 )
651 }
652 WorkerError::FatalError(FatalError::Cancelled, _version) => {
653 return Ok(Some(AppendOrCancel::Cancel {
655 execution_id,
656 cancelled_at: result_obtained_at,
657 }));
658 }
659 WorkerError::FatalError(fatal_error, version) => {
660 warn!("Fatal worker error - {fatal_error:?}");
661 let result = SupportedFunctionReturnValue::ExecutionError(
662 FinishedExecutionError::from(fatal_error),
663 );
664 let child_finished =
665 parent.map(|(parent_execution_id, parent_join_set)| {
666 ChildFinishedResponse {
667 parent_execution_id,
668 parent_join_set,
669 result: result.clone(),
670 }
671 });
672 (
673 ExecutionRequest::Finished {
674 result,
675 http_client_traces: None,
676 },
677 child_finished,
678 version,
679 )
680 }
681 };
682 Some(AppendOrCancel::Other(Append {
683 created_at: result_obtained_at,
684 primary_event: AppendRequest {
685 created_at: result_obtained_at,
686 event: primary_event,
687 },
688 execution_id,
689 version,
690 child_finished,
691 }))
692 }
693 })
694 }
695}
696
697#[derive(Debug, Clone)]
698pub(crate) struct ChildFinishedResponse {
699 pub(crate) parent_execution_id: ExecutionId,
700 pub(crate) parent_join_set: JoinSetId,
701 pub(crate) result: SupportedFunctionReturnValue,
702}
703
704#[derive(Debug, Clone)]
705#[expect(clippy::large_enum_variant)]
706pub(crate) enum AppendOrCancel {
707 Cancel {
708 execution_id: ExecutionId,
709 cancelled_at: DateTime<Utc>,
710 },
711 Other(Append),
712}
713
714#[derive(Debug, Clone)]
715pub(crate) struct Append {
716 pub(crate) created_at: DateTime<Utc>,
717 pub(crate) primary_event: AppendRequest,
718 pub(crate) execution_id: ExecutionId,
719 pub(crate) version: Version,
720 pub(crate) child_finished: Option<ChildFinishedResponse>,
721}
722
723impl Append {
724 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
725 if let Some(child_finished) = self.child_finished {
726 assert_matches!(
727 &self.primary_event,
728 AppendRequest {
729 event: ExecutionRequest::Finished { .. },
730 ..
731 }
732 );
733 let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
734 let events = AppendEventsToExecution {
735 execution_id: self.execution_id,
736 version: self.version.clone(),
737 batch: vec![self.primary_event],
738 };
739 let response = AppendResponseToExecution {
740 parent_execution_id: child_finished.parent_execution_id,
741 created_at: self.created_at,
742 join_set_id: child_finished.parent_join_set,
743 child_execution_id,
744 finished_version: self.version, result: child_finished.result,
746 };
747
748 db_exec
749 .append_batch_respond_to_parent(events, response, self.created_at)
750 .await?;
751 } else {
752 db_exec
753 .append(self.execution_id, self.version, self.primary_event)
754 .await?;
755 }
756 Ok(())
757 }
758}
759
760fn log_err_if_new<T>(
761 res: Result<T, DbErrorGeneric>,
762 old_err: &mut Option<DbErrorGeneric>,
763) -> Result<T, ()> {
764 match (res, &old_err) {
765 (Ok(ok), _) => {
766 *old_err = None;
767 Ok(ok)
768 }
769 (Err(err), Some(old)) if err == *old => Err(()),
770 (Err(err), _) => {
771 warn!("Tick failed: {err:?}");
772 *old_err = Some(err);
773 Err(())
774 }
775 }
776}
777
778#[cfg(any(test, feature = "test"))]
779pub mod simple_worker {
780 use crate::worker::{Worker, WorkerContext, WorkerResult};
781 use async_trait::async_trait;
782 use concepts::{
783 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
784 storage::{HistoryEvent, Version},
785 };
786 use indexmap::IndexMap;
787 use std::sync::Arc;
788 use tracing::trace;
789
790 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
791 pub type SimpleWorkerResultMap =
792 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
793
794 #[derive(Clone, Debug)]
795 pub struct SimpleWorker {
796 pub worker_results_rev: SimpleWorkerResultMap,
797 pub ffqn: FunctionFqn,
798 exported: [FunctionMetadata; 1],
799 }
800
801 impl SimpleWorker {
802 #[must_use]
803 pub fn with_single_result(res: WorkerResult) -> Self {
804 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
805 Version::new(2),
806 (vec![], res),
807 )]))))
808 }
809
810 #[must_use]
811 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
812 Self {
813 worker_results_rev: self.worker_results_rev,
814 exported: [FunctionMetadata {
815 ffqn: ffqn.clone(),
816 parameter_types: ParameterTypes::default(),
817 return_type: RETURN_TYPE_DUMMY,
818 extension: None,
819 submittable: true,
820 }],
821 ffqn,
822 }
823 }
824
825 #[must_use]
826 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
827 Self {
828 worker_results_rev,
829 ffqn: FFQN_SOME,
830 exported: [FunctionMetadata {
831 ffqn: FFQN_SOME,
832 parameter_types: ParameterTypes::default(),
833 return_type: RETURN_TYPE_DUMMY,
834 extension: None,
835 submittable: true,
836 }],
837 }
838 }
839 }
840
841 #[async_trait]
842 impl Worker for SimpleWorker {
843 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
844 let (expected_version, (expected_eh, worker_result)) =
845 self.worker_results_rev.lock().unwrap().pop().unwrap();
846 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
847 assert_eq!(expected_version, ctx.version);
848 assert_eq!(
849 expected_eh,
850 ctx.event_history
851 .iter()
852 .map(|(event, _version)| event.clone())
853 .collect::<Vec<_>>()
854 );
855 worker_result
856 }
857
858 fn exported_functions(&self) -> &[FunctionMetadata] {
859 &self.exported
860 }
861 }
862}
863
864#[cfg(test)]
865mod tests {
866 use self::simple_worker::SimpleWorker;
867 use super::*;
868 use crate::{expired_timers_watcher, worker::WorkerResult};
869 use assert_matches::assert_matches;
870 use async_trait::async_trait;
871 use concepts::storage::{
872 CreateRequest, DbConnectionTest, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
873 };
874 use concepts::storage::{DbPoolCloseable, LockedBy};
875 use concepts::storage::{ExecutionEvent, ExecutionRequest, HistoryEvent, PendingState};
876 use concepts::time::Now;
877 use concepts::{
878 FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
879 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
880 };
881 use db_tests::Database;
882 use indexmap::IndexMap;
883 use rstest::rstest;
884 use simple_worker::FFQN_SOME;
885 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
886 use test_db_macro::expand_enum_database;
887 use test_utils::set_up;
888 use test_utils::sim_clock::{ConstClock, SimClock};
889
890 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
891
892 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
893 config: ExecConfig,
894 clock_fn: C,
895 db_pool: Arc<dyn DbPool>,
896 worker: Arc<W>,
897 executed_at: DateTime<Utc>,
898 ) -> Vec<ExecutionId> {
899 trace!("Ticking with {worker:?}");
900 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
901 let executor = ExecTask::new_test(worker, config, clock_fn, db_pool, ffqns);
902 executor
903 .tick_test_await(executed_at, RunId::generate())
904 .await
905 }
906
907 #[expand_enum_database]
908 #[rstest]
909 #[tokio::test]
910 async fn execute_simple_lifecycle_tick_based(
911 database: Database,
912 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentId)]
913 locking_strategy: LockingStrategy,
914 ) {
915 set_up();
916 let created_at = Now.now();
917 let (_guard, db_pool, db_close) = database.set_up().await;
918 let db_connection = db_pool.connection_test().await.unwrap();
919 execute_simple_lifecycle_tick_based_inner(
920 db_connection.as_ref(),
921 db_pool.clone(),
922 ConstClock(created_at),
923 locking_strategy,
924 )
925 .await;
926 drop(db_connection);
927 db_close.close().await;
928 }
929
930 async fn execute_simple_lifecycle_tick_based_inner<C: ClockFn + 'static>(
931 db_connection: &dyn DbConnectionTest,
932 db_pool: Arc<dyn DbPool>,
933 clock_fn: C,
934 locking_strategy: LockingStrategy,
935 ) {
936 let created_at = clock_fn.now();
937 let exec_config = ExecConfig {
938 batch_size: 1,
939 lock_expiry: Duration::from_secs(1),
940 tick_sleep: Duration::from_millis(100),
941 component_id: ComponentId::dummy_activity(),
942 task_limiter: None,
943 executor_id: ExecutorId::generate(),
944 retry_config: ComponentRetryConfig::ZERO,
945 locking_strategy,
946 };
947
948 let execution_log = create_and_tick(
949 CreateAndTickConfig {
950 execution_id: ExecutionId::generate(),
951 created_at,
952 executed_at: created_at,
953 },
954 clock_fn,
955 db_connection,
956 db_pool,
957 exec_config,
958 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
959 SUPPORTED_RETURN_VALUE_OK_EMPTY,
960 Version::new(2),
961 None,
962 ))),
963 tick_fn,
964 )
965 .await;
966 assert_matches!(
967 execution_log.events.get(2).unwrap(),
968 ExecutionEvent {
969 event: ExecutionRequest::Finished {
970 result: SupportedFunctionReturnValue::Ok { ok: None },
971 http_client_traces: None
972 },
973 created_at: _,
974 backtrace_id: None,
975 version: Version(2),
976 }
977 );
978 }
979
980 #[tokio::test]
981 async fn execute_simple_lifecycle_task_based_mem() {
982 set_up();
983 let created_at = Now.now();
984 let clock_fn = ConstClock(created_at);
985 let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
986 let exec_config = ExecConfig {
987 batch_size: 1,
988 lock_expiry: Duration::from_secs(1),
989 tick_sleep: Duration::ZERO,
990 component_id: ComponentId::dummy_activity(),
991 task_limiter: None,
992 executor_id: ExecutorId::generate(),
993 retry_config: ComponentRetryConfig::ZERO,
994 locking_strategy: LockingStrategy::default(),
995 };
996
997 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
998 SUPPORTED_RETURN_VALUE_OK_EMPTY,
999 Version::new(2),
1000 None,
1001 )));
1002 let db_connection = db_pool.connection_test().await.unwrap();
1003
1004 let execution_log = create_and_tick(
1005 CreateAndTickConfig {
1006 execution_id: ExecutionId::generate(),
1007 created_at,
1008 executed_at: created_at,
1009 },
1010 clock_fn,
1011 db_connection.as_ref(),
1012 db_pool,
1013 exec_config,
1014 worker,
1015 tick_fn,
1016 )
1017 .await;
1018 assert_matches!(
1019 execution_log.events.get(2).unwrap(),
1020 ExecutionEvent {
1021 event: ExecutionRequest::Finished {
1022 result: SupportedFunctionReturnValue::Ok { ok: None },
1023 http_client_traces: None
1024 },
1025 created_at: _,
1026 backtrace_id: None,
1027 version: Version(2),
1028 }
1029 );
1030 db_close.close().await;
1031 }
1032
1033 struct CreateAndTickConfig {
1034 execution_id: ExecutionId,
1035 created_at: DateTime<Utc>,
1036 executed_at: DateTime<Utc>,
1037 }
1038
1039 async fn create_and_tick<
1040 W: Worker,
1041 C: ClockFn,
1042 T: FnMut(ExecConfig, C, Arc<dyn DbPool>, Arc<W>, DateTime<Utc>) -> F,
1043 F: Future<Output = Vec<ExecutionId>>,
1044 >(
1045 config: CreateAndTickConfig,
1046 clock_fn: C,
1047 db_connection: &dyn DbConnectionTest,
1048 db_pool: Arc<dyn DbPool>,
1049 exec_config: ExecConfig,
1050 worker: Arc<W>,
1051 mut tick: T,
1052 ) -> ExecutionLog {
1053 db_connection
1055 .create(CreateRequest {
1056 created_at: config.created_at,
1057 execution_id: config.execution_id.clone(),
1058 ffqn: FFQN_SOME,
1059 params: Params::empty(),
1060 parent: None,
1061 metadata: concepts::ExecutionMetadata::empty(),
1062 scheduled_at: config.created_at,
1063 component_id: ComponentId::dummy_activity(),
1064 scheduled_by: None,
1065 })
1066 .await
1067 .unwrap();
1068 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
1070 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
1071 debug!("Execution history after tick: {execution_log:?}");
1072 let actually_created_at = assert_matches!(
1074 execution_log.events.first().unwrap(),
1075 ExecutionEvent {
1076 event: ExecutionRequest::Created { .. },
1077 created_at: actually_created_at,
1078 backtrace_id: None,
1079 version: Version(0),
1080 }
1081 => *actually_created_at
1082 );
1083 assert_eq!(config.created_at, actually_created_at);
1084 let locked_at = assert_matches!(
1085 execution_log.events.get(1).unwrap(),
1086 ExecutionEvent {
1087 event: ExecutionRequest::Locked { .. },
1088 created_at: locked_at,
1089 backtrace_id: None,
1090 version: Version(1),
1091 } if config.created_at <= *locked_at
1092 => *locked_at
1093 );
1094 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
1095 event: _,
1096 created_at: executed_at,
1097 backtrace_id: None,
1098 version: Version(2),
1099 } if *executed_at >= locked_at);
1100 execution_log
1101 }
1102
1103 #[tokio::test]
1104 async fn activity_trap_should_trigger_an_execution_retry() {
1105 set_up();
1106 let sim_clock = SimClock::default();
1107 let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1108 let retry_exp_backoff = Duration::from_millis(100);
1109 let retry_config = ComponentRetryConfig {
1110 max_retries: Some(1),
1111 retry_exp_backoff,
1112 };
1113 let exec_config = ExecConfig {
1114 batch_size: 1,
1115 lock_expiry: Duration::from_secs(1),
1116 tick_sleep: Duration::ZERO,
1117 component_id: ComponentId::dummy_activity(),
1118 task_limiter: None,
1119 executor_id: ExecutorId::generate(),
1120 retry_config,
1121 locking_strategy: LockingStrategy::default(),
1122 };
1123 let expected_reason = "error reason";
1124 let expected_detail = "error detail";
1125 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1126 WorkerError::ActivityTrap {
1127 reason: expected_reason.to_string(),
1128 trap_kind: concepts::TrapKind::Trap,
1129 detail: Some(expected_detail.to_string()),
1130 version: Version::new(2),
1131 http_client_traces: None,
1132 },
1133 )));
1134 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1135 let db_connection = db_pool.connection_test().await.unwrap();
1136 let execution_log = create_and_tick(
1137 CreateAndTickConfig {
1138 execution_id: ExecutionId::generate(),
1139 created_at: sim_clock.now(),
1140 executed_at: sim_clock.now(),
1141 },
1142 sim_clock.clone(),
1143 db_connection.as_ref(),
1144 db_pool.clone(),
1145 exec_config.clone(),
1146 worker,
1147 tick_fn,
1148 )
1149 .await;
1150 assert_eq!(3, execution_log.events.len());
1151 {
1152 let (reason, detail, at, expires_at) = assert_matches!(
1153 &execution_log.events.get(2).unwrap(),
1154 ExecutionEvent {
1155 event: ExecutionRequest::TemporarilyFailed {
1156 reason,
1157 detail,
1158 backoff_expires_at,
1159 http_client_traces: None,
1160 },
1161 created_at: at,
1162 backtrace_id: None,
1163 version: Version(2),
1164 }
1165 => (reason, detail, *at, *backoff_expires_at)
1166 );
1167 assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1168 assert_eq!(Some(expected_detail), detail.as_deref());
1169 assert_eq!(at, sim_clock.now());
1170 assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1171 }
1172 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1173 std::sync::Mutex::new(IndexMap::from([(
1174 Version::new(4),
1175 (
1176 vec![],
1177 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1178 ),
1179 )])),
1180 )));
1181 assert!(
1183 tick_fn(
1184 exec_config.clone(),
1185 sim_clock.clone(),
1186 db_pool.clone(),
1187 worker.clone(),
1188 sim_clock.now(),
1189 )
1190 .await
1191 .is_empty()
1192 );
1193 sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1195 tick_fn(
1196 exec_config,
1197 sim_clock.clone(),
1198 db_pool.clone(),
1199 worker,
1200 sim_clock.now(),
1201 )
1202 .await;
1203 let execution_log = {
1204 let db_connection = db_pool.connection_test().await.unwrap();
1205 db_connection
1206 .get(&execution_log.execution_id)
1207 .await
1208 .unwrap()
1209 };
1210 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1211 assert_matches!(
1212 execution_log.events.get(3).unwrap(),
1213 ExecutionEvent {
1214 event: ExecutionRequest::Locked { .. },
1215 created_at: at,
1216 backtrace_id: None,
1217 version: Version(3),
1218 } if *at == sim_clock.now()
1219 );
1220 assert_matches!(
1221 execution_log.events.get(4).unwrap(),
1222 ExecutionEvent {
1223 event: ExecutionRequest::Finished {
1224 result: SupportedFunctionReturnValue::Ok{ok:None},
1225 http_client_traces: None
1226 },
1227 created_at: finished_at,
1228 backtrace_id: None,
1229 version: Version(4),
1230 } if *finished_at == sim_clock.now()
1231 );
1232 db_close.close().await;
1233 }
1234
1235 #[tokio::test]
1236 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1237 set_up();
1238 let created_at = Now.now();
1239 let clock_fn = ConstClock(created_at);
1240 let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1241 let exec_config = ExecConfig {
1242 batch_size: 1,
1243 lock_expiry: Duration::from_secs(1),
1244 tick_sleep: Duration::ZERO,
1245 component_id: ComponentId::dummy_activity(),
1246 task_limiter: None,
1247 executor_id: ExecutorId::generate(),
1248 retry_config: ComponentRetryConfig::ZERO,
1249 locking_strategy: LockingStrategy::default(),
1250 };
1251
1252 let reason = "error reason";
1253 let expected_reason = format!("activity trap: {reason}");
1254 let expected_detail = "error detail";
1255 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1256 WorkerError::ActivityTrap {
1257 reason: reason.to_string(),
1258 trap_kind: concepts::TrapKind::Trap,
1259 detail: Some(expected_detail.to_string()),
1260 version: Version::new(2),
1261 http_client_traces: None,
1262 },
1263 )));
1264 let execution_log = create_and_tick(
1265 CreateAndTickConfig {
1266 execution_id: ExecutionId::generate(),
1267 created_at,
1268 executed_at: created_at,
1269 },
1270 clock_fn,
1271 db_pool.connection_test().await.unwrap().as_ref(),
1272 db_pool.clone(),
1273 exec_config.clone(),
1274 worker,
1275 tick_fn,
1276 )
1277 .await;
1278 assert_eq!(3, execution_log.events.len());
1279 let (reason, kind, detail) = assert_matches!(
1280 &execution_log.events.get(2).unwrap(),
1281 ExecutionEvent {
1282 event: ExecutionRequest::Finished{
1283 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1284 http_client_traces: None
1285 },
1286 created_at: at,
1287 backtrace_id: None,
1288 version: Version(2),
1289 } if *at == created_at
1290 => (reason, kind, detail)
1291 );
1292
1293 assert_eq!(Some(expected_reason), *reason);
1294 assert_eq!(Some(expected_detail), detail.as_deref());
1295 assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1296
1297 db_close.close().await;
1298 }
1299
1300 #[tokio::test]
1301 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1302 let worker_error = WorkerError::ActivityTrap {
1303 reason: "error reason".to_string(),
1304 trap_kind: TrapKind::Trap,
1305 detail: Some("detail".to_string()),
1306 version: Version::new(2),
1307 http_client_traces: None,
1308 };
1309 let expected_child_err = FinishedExecutionError {
1310 kind: ExecutionFailureKind::Uncategorized,
1311 reason: Some("activity trap: error reason".to_string()),
1312 detail: Some("detail".to_string()),
1313 };
1314 child_execution_permanently_failed_should_notify_parent(
1315 WorkerResult::Err(worker_error),
1316 expected_child_err,
1317 )
1318 .await;
1319 }
1320
1321 #[tokio::test]
1322 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1323 let expected_child_err = FinishedExecutionError {
1324 kind: ExecutionFailureKind::TimedOut,
1325 reason: None,
1326 detail: None,
1327 };
1328 child_execution_permanently_failed_should_notify_parent(
1329 WorkerResult::DbUpdatedByWorkerOrWatcher,
1330 expected_child_err,
1331 )
1332 .await;
1333 }
1334
1335 async fn child_execution_permanently_failed_should_notify_parent(
1336 worker_result: WorkerResult,
1337 expected_child_err: FinishedExecutionError,
1338 ) {
1339 use concepts::storage::JoinSetResponseEventOuter;
1340 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1341
1342 set_up();
1343 let sim_clock = SimClock::default();
1344 let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1345
1346 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1347 WorkerResult::DbUpdatedByWorkerOrWatcher,
1348 ));
1349 let parent_execution_id = ExecutionId::generate();
1350 db_pool
1351 .connection()
1352 .await
1353 .unwrap()
1354 .create(CreateRequest {
1355 created_at: sim_clock.now(),
1356 execution_id: parent_execution_id.clone(),
1357 ffqn: FFQN_SOME,
1358 params: Params::empty(),
1359 parent: None,
1360 metadata: concepts::ExecutionMetadata::empty(),
1361 scheduled_at: sim_clock.now(),
1362 component_id: ComponentId::dummy_activity(),
1363 scheduled_by: None,
1364 })
1365 .await
1366 .unwrap();
1367 let parent_executor_id = ExecutorId::generate();
1368 tick_fn(
1369 ExecConfig {
1370 batch_size: 1,
1371 lock_expiry: LOCK_EXPIRY,
1372 tick_sleep: Duration::ZERO,
1373 component_id: ComponentId::dummy_activity(),
1374 task_limiter: None,
1375 executor_id: parent_executor_id,
1376 retry_config: ComponentRetryConfig::ZERO,
1377 locking_strategy: LockingStrategy::default(),
1378 },
1379 sim_clock.clone(),
1380 db_pool.clone(),
1381 parent_worker,
1382 sim_clock.now(),
1383 )
1384 .await;
1385
1386 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1387 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1388 {
1390 let params = Params::empty();
1391 let child = CreateRequest {
1392 created_at: sim_clock.now(),
1393 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1394 ffqn: FFQN_CHILD,
1395 params: params.clone(),
1396 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1397 metadata: concepts::ExecutionMetadata::empty(),
1398 scheduled_at: sim_clock.now(),
1399 component_id: ComponentId::dummy_activity(),
1400 scheduled_by: None,
1401 };
1402 let current_time = sim_clock.now();
1403 let join_set = AppendRequest {
1404 created_at: current_time,
1405 event: ExecutionRequest::HistoryEvent {
1406 event: HistoryEvent::JoinSetCreate {
1407 join_set_id: join_set_id.clone(),
1408 },
1409 },
1410 };
1411 let child_exec_req = AppendRequest {
1412 created_at: current_time,
1413 event: ExecutionRequest::HistoryEvent {
1414 event: HistoryEvent::JoinSetRequest {
1415 join_set_id: join_set_id.clone(),
1416 request: JoinSetRequest::ChildExecutionRequest {
1417 child_execution_id: child_execution_id.clone(),
1418 target_ffqn: FFQN_CHILD,
1419 params,
1420 },
1421 },
1422 },
1423 };
1424 let join_next = AppendRequest {
1425 created_at: current_time,
1426 event: ExecutionRequest::HistoryEvent {
1427 event: HistoryEvent::JoinNext {
1428 join_set_id: join_set_id.clone(),
1429 run_expires_at: sim_clock.now(),
1430 closing: false,
1431 requested_ffqn: Some(FFQN_CHILD),
1432 },
1433 },
1434 };
1435 db_pool
1436 .connection()
1437 .await
1438 .unwrap()
1439 .append_batch_create_new_execution(
1440 current_time,
1441 vec![join_set, child_exec_req, join_next],
1442 parent_execution_id.clone(),
1443 Version::new(2),
1444 vec![child],
1445 )
1446 .await
1447 .unwrap();
1448 }
1449
1450 let child_worker =
1451 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1452
1453 tick_fn(
1455 ExecConfig {
1456 batch_size: 1,
1457 lock_expiry: LOCK_EXPIRY,
1458 tick_sleep: Duration::ZERO,
1459 component_id: ComponentId::dummy_activity(),
1460 task_limiter: None,
1461 executor_id: ExecutorId::generate(),
1462 retry_config: ComponentRetryConfig::ZERO,
1463 locking_strategy: LockingStrategy::default(),
1464 },
1465 sim_clock.clone(),
1466 db_pool.clone(),
1467 child_worker,
1468 sim_clock.now(),
1469 )
1470 .await;
1471 if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1472 sim_clock.move_time_forward(LOCK_EXPIRY);
1474 expired_timers_watcher::tick(
1475 db_pool.connection().await.unwrap().as_ref(),
1476 sim_clock.now(),
1477 )
1478 .await
1479 .unwrap();
1480 }
1481 let child_log = db_pool
1482 .connection_test()
1483 .await
1484 .unwrap()
1485 .get(&ExecutionId::Derived(child_execution_id.clone()))
1486 .await
1487 .unwrap();
1488 assert!(child_log.pending_state.is_finished());
1489 assert_eq!(
1490 Version(2),
1491 child_log.next_version,
1492 "created = 0, locked = 1, with_single_result = 2"
1493 );
1494 assert_eq!(
1495 ExecutionRequest::Finished {
1496 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1497 http_client_traces: None
1498 },
1499 child_log.last_event().event
1500 );
1501 let parent_log = db_pool
1502 .connection_test()
1503 .await
1504 .unwrap()
1505 .get(&parent_execution_id)
1506 .await
1507 .unwrap();
1508 assert_matches!(
1509 parent_log.pending_state,
1510 PendingState::PendingAt {
1511 scheduled_at,
1512 last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1513 component_id_input_digest: _
1514 } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1515 "parent should be back to pending"
1516 );
1517 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1518 parent_log.responses.last(),
1519 Some(JoinSetResponseEventOuter{
1520 created_at: at,
1521 event: JoinSetResponseEvent{
1522 join_set_id: found_join_set_id,
1523 event: JoinSetResponse::ChildExecutionFinished {
1524 child_execution_id: found_child_execution_id,
1525 finished_version,
1526 result: found_result,
1527 }
1528 }
1529 })
1530 if *at == sim_clock.now()
1531 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1532 );
1533 assert_eq!(join_set_id, *found_join_set_id);
1534 assert_eq!(child_execution_id, *found_child_execution_id);
1535 assert_eq!(child_log.next_version, *child_finished_version);
1536 assert_matches!(
1537 found_result,
1538 SupportedFunctionReturnValue::ExecutionError(_)
1539 );
1540
1541 db_close.close().await;
1542 }
1543
1544 #[derive(Clone, Debug)]
1545 struct SleepyWorker {
1546 duration: Duration,
1547 result: SupportedFunctionReturnValue,
1548 exported: [FunctionMetadata; 1],
1549 }
1550
1551 #[async_trait]
1552 impl Worker for SleepyWorker {
1553 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1554 tokio::time::sleep(self.duration).await;
1555 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1556 }
1557
1558 fn exported_functions(&self) -> &[FunctionMetadata] {
1559 &self.exported
1560 }
1561 }
1562
1563 #[tokio::test]
1564 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1565 set_up();
1566 let sim_clock = SimClock::default();
1567 let (_guard, db_pool, db_close) = Database::Memory.set_up().await;
1568 let lock_expiry = Duration::from_millis(100);
1569 let timeout_duration = Duration::from_millis(300);
1570 let retry_config = ComponentRetryConfig {
1571 max_retries: Some(1),
1572 retry_exp_backoff: timeout_duration,
1573 };
1574 let exec_config = ExecConfig {
1575 batch_size: 1,
1576 lock_expiry,
1577 tick_sleep: Duration::ZERO,
1578 component_id: ComponentId::dummy_activity(),
1579 task_limiter: None,
1580 executor_id: ExecutorId::generate(),
1581 retry_config,
1582 locking_strategy: LockingStrategy::default(),
1583 };
1584
1585 let worker = Arc::new(SleepyWorker {
1586 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1588 exported: [FunctionMetadata {
1589 ffqn: FFQN_SOME,
1590 parameter_types: ParameterTypes::default(),
1591 return_type: RETURN_TYPE_DUMMY,
1592 extension: None,
1593 submittable: true,
1594 }],
1595 });
1596 let execution_id = ExecutionId::generate();
1598 let db_connection = db_pool.connection_test().await.unwrap();
1599 db_connection
1600 .create(CreateRequest {
1601 created_at: sim_clock.now(),
1602 execution_id: execution_id.clone(),
1603 ffqn: FFQN_SOME,
1604 params: Params::empty(),
1605 parent: None,
1606 metadata: concepts::ExecutionMetadata::empty(),
1607 scheduled_at: sim_clock.now(),
1608 component_id: ComponentId::dummy_activity(),
1609 scheduled_by: None,
1610 })
1611 .await
1612 .unwrap();
1613
1614 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1615 let executor = ExecTask::new_test(
1616 worker,
1617 exec_config.clone(),
1618 sim_clock.clone(),
1619 db_pool.clone(),
1620 ffqns,
1621 );
1622 let db_exec = db_pool.db_exec_conn().await.unwrap();
1623 let mut first_execution_progress = executor
1624 .tick(db_exec.as_ref(), sim_clock.now(), RunId::generate())
1625 .await
1626 .unwrap();
1627 assert_eq!(1, first_execution_progress.executions.len());
1628 sim_clock.move_time_forward(lock_expiry);
1630 let now_after_first_lock_expiry = sim_clock.now();
1632 {
1633 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1634 let cleanup_progress = executor
1635 .tick(
1636 db_pool.db_exec_conn().await.unwrap().as_ref(),
1637 now_after_first_lock_expiry,
1638 RunId::generate(),
1639 )
1640 .await
1641 .unwrap();
1642 assert!(cleanup_progress.executions.is_empty());
1643 }
1644 {
1645 let expired_locks = expired_timers_watcher::tick(
1646 db_pool.connection().await.unwrap().as_ref(),
1647 now_after_first_lock_expiry,
1648 )
1649 .await
1650 .unwrap()
1651 .expired_locks;
1652 assert_eq!(1, expired_locks);
1653 }
1654 assert!(
1655 !first_execution_progress
1656 .executions
1657 .pop()
1658 .unwrap()
1659 .1
1660 .is_finished()
1661 );
1662
1663 let execution_log = db_connection.get(&execution_id).await.unwrap();
1664 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1665 assert_matches!(
1666 &execution_log.events.get(2).unwrap(),
1667 ExecutionEvent {
1668 event: ExecutionRequest::TemporarilyTimedOut { backoff_expires_at, .. },
1669 created_at: at,
1670 backtrace_id: None,
1671 version: Version(2),
1672 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1673 );
1674 assert_matches!(
1675 execution_log.pending_state,
1676 PendingState::PendingAt {
1677 scheduled_at: found_scheduled_by,
1678 last_lock: Some(LockedBy {
1679 executor_id: found_executor_id,
1680 run_id: _,
1681 }),
1682 component_id_input_digest: _
1683 } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1684 );
1685 sim_clock.move_time_forward(timeout_duration);
1686 let now_after_first_timeout = sim_clock.now();
1687 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1688
1689 let mut second_execution_progress = executor
1690 .tick(
1691 db_pool.db_exec_conn().await.unwrap().as_ref(),
1692 now_after_first_timeout,
1693 RunId::generate(),
1694 )
1695 .await
1696 .unwrap();
1697 assert_eq!(1, second_execution_progress.executions.len());
1698
1699 sim_clock.move_time_forward(lock_expiry);
1701 let now_after_second_lock_expiry = sim_clock.now();
1703 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1704 {
1705 let cleanup_progress = executor
1706 .tick(
1707 db_pool.db_exec_conn().await.unwrap().as_ref(),
1708 now_after_second_lock_expiry,
1709 RunId::generate(),
1710 )
1711 .await
1712 .unwrap();
1713 assert!(cleanup_progress.executions.is_empty());
1714 }
1715 {
1716 let expired_locks = expired_timers_watcher::tick(
1717 db_pool.connection().await.unwrap().as_ref(),
1718 now_after_second_lock_expiry,
1719 )
1720 .await
1721 .unwrap()
1722 .expired_locks;
1723 assert_eq!(1, expired_locks);
1724 }
1725 assert!(
1726 !second_execution_progress
1727 .executions
1728 .pop()
1729 .unwrap()
1730 .1
1731 .is_finished()
1732 );
1733
1734 drop(db_connection);
1735 drop(executor);
1736 db_close.close().await;
1737 }
1738}