1use crate::worker::{FatalError, Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11 ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16 FinishedExecutionError,
17 storage::{ExecutionEventInner, Version},
18};
19use std::{
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31 pub lock_expiry: Duration,
32 pub tick_sleep: Duration,
33 pub batch_size: u32,
34 pub component_id: ComponentId,
35 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36 pub executor_id: ExecutorId,
37 pub retry_config: ComponentRetryConfig,
38}
39
40pub struct ExecTask<C: ClockFn> {
41 worker: Arc<dyn Worker>,
42 config: ExecConfig,
43 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
45 ffqns: Arc<[FunctionFqn]>,
46}
47
48#[derive(derive_more::Debug, Default)]
49pub struct ExecutionProgress {
50 #[debug(skip)]
51 #[allow(dead_code)]
52 executions: Vec<(ExecutionId, JoinHandle<()>)>,
53}
54
55impl ExecutionProgress {
56 #[cfg(feature = "test")]
57 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
58 let mut vec = Vec::new();
59 for (exe, join_handle) in self.executions {
60 vec.push(exe);
61 join_handle.await.unwrap();
62 }
63 vec
64 }
65}
66
67#[derive(derive_more::Debug)]
68pub struct ExecutorTaskHandle {
69 #[debug(skip)]
70 is_closing: Arc<AtomicBool>,
71 #[debug(skip)]
72 abort_handle: AbortHandle,
73 component_id: ComponentId,
74 executor_id: ExecutorId,
75}
76
77impl ExecutorTaskHandle {
78 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
79 pub async fn close(&self) {
80 trace!("Gracefully closing");
81 self.is_closing.store(true, Ordering::Relaxed);
89 while !self.abort_handle.is_finished() {
90 tokio::time::sleep(Duration::from_millis(1)).await;
91 }
92 debug!("Gracefully closed");
93 }
94}
95
96impl Drop for ExecutorTaskHandle {
97 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
98 fn drop(&mut self) {
99 if self.abort_handle.is_finished() {
100 return;
101 }
102 warn!("Aborting the executor task");
103 self.abort_handle.abort();
104 }
105}
106
107#[cfg(feature = "test")]
108pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
109 extract_exported_ffqns_noext(worker)
110}
111
112fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
113 worker
114 .exported_functions()
115 .iter()
116 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
117 .collect::<Arc<_>>()
118}
119
120impl<C: ClockFn + 'static> ExecTask<C> {
121 #[cfg(feature = "test")]
122 pub fn new_test(
123 worker: Arc<dyn Worker>,
124 config: ExecConfig,
125 clock_fn: C,
126 db_exec: Arc<dyn DbExecutor>,
127 ffqns: Arc<[FunctionFqn]>,
128 ) -> Self {
129 Self {
130 worker,
131 config,
132 clock_fn,
133 db_exec,
134 ffqns,
135 }
136 }
137
138 #[cfg(feature = "test")]
139 pub fn new_all_ffqns_test(
140 worker: Arc<dyn Worker>,
141 config: ExecConfig,
142 clock_fn: C,
143 db_exec: Arc<dyn DbExecutor>,
144 ) -> Self {
145 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
146 Self {
147 worker,
148 config,
149 clock_fn,
150 db_exec,
151 ffqns,
152 }
153 }
154
155 pub fn spawn_new(
156 worker: Arc<dyn Worker>,
157 config: ExecConfig,
158 clock_fn: C,
159 db_exec: Arc<dyn DbExecutor>,
160 sleep: impl Sleep + 'static,
161 ) -> ExecutorTaskHandle {
162 let is_closing = Arc::new(AtomicBool::default());
163 let is_closing_inner = is_closing.clone();
164 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
165 let component_id = config.component_id.clone();
166 let executor_id = config.executor_id;
167 let abort_handle = tokio::spawn(async move {
168 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
169 let task = ExecTask {
170 worker,
171 config,
172 db_exec,
173 ffqns: ffqns.clone(),
174 clock_fn: clock_fn.clone(),
175 };
176 while !is_closing_inner.load(Ordering::Relaxed) {
177 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
178
179 task.db_exec
180 .wait_for_pending(clock_fn.now(), ffqns.clone(), {
181 let sleep = sleep.clone();
182 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
183 .await;
184 }
185 })
186 .abort_handle();
187 ExecutorTaskHandle {
188 is_closing,
189 abort_handle,
190 component_id,
191 executor_id,
192 }
193 }
194
195 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
196 if let Some(task_limiter) = &self.config.task_limiter {
197 let mut locks = Vec::new();
198 for _ in 0..self.config.batch_size {
199 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
200 locks.push(Some(permit));
201 } else {
202 break;
203 }
204 }
205 locks
206 } else {
207 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
208 for _ in 0..self.config.batch_size {
209 vec.push(None);
210 }
211 vec
212 }
213 }
214
215 #[cfg(feature = "test")]
216 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
217 self.tick(executed_at, run_id).await.unwrap()
218 }
219
220 #[cfg(feature = "test")]
221 pub async fn tick_test_await(
222 &self,
223 executed_at: DateTime<Utc>,
224 run_id: RunId,
225 ) -> Vec<ExecutionId> {
226 self.tick(executed_at, run_id)
227 .await
228 .unwrap()
229 .wait_for_tasks()
230 .await
231 }
232
233 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
234 async fn tick(
235 &self,
236 executed_at: DateTime<Utc>,
237 run_id: RunId,
238 ) -> Result<ExecutionProgress, DbErrorGeneric> {
239 let locked_executions = {
240 let mut permits = self.acquire_task_permits();
241 if permits.is_empty() {
242 return Ok(ExecutionProgress::default());
243 }
244 let lock_expires_at = executed_at + self.config.lock_expiry;
245 let locked_executions = self
246 .db_exec
247 .lock_pending(
248 permits.len(), executed_at, self.ffqns.clone(),
251 executed_at, self.config.component_id.clone(),
253 self.config.executor_id,
254 lock_expires_at,
255 run_id,
256 self.config.retry_config,
257 )
258 .await?;
259 while permits.len() > locked_executions.len() {
261 permits.pop();
262 }
263 assert_eq!(permits.len(), locked_executions.len());
264 locked_executions.into_iter().zip(permits)
265 };
266
267 let mut executions = Vec::with_capacity(locked_executions.len());
268 for (locked_execution, permit) in locked_executions {
269 let execution_id = locked_execution.execution_id.clone();
270 let join_handle = {
271 let worker = self.worker.clone();
272 let db = self.db_exec.clone();
273 let clock_fn = self.clock_fn.clone();
274 let worker_span = info_span!(parent: None, "worker",
275 "otel.name" = format!("worker {}", locked_execution.ffqn),
276 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
277 locked_execution.metadata.enrich(&worker_span);
278 tokio::spawn({
279 let worker_span2 = worker_span.clone();
280 let retry_config = self.config.retry_config;
281 async move {
282 let _permit = permit;
283 let res = Self::run_worker(
284 worker,
285 db.as_ref(),
286 clock_fn,
287 locked_execution,
288 retry_config,
289 worker_span2,
290 )
291 .await;
292 if let Err(db_error) = res {
293 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
294 }
295 }
296 .instrument(worker_span)
297 })
298 };
299 executions.push((execution_id, join_handle));
300 }
301 Ok(ExecutionProgress { executions })
302 }
303
304 async fn run_worker(
305 worker: Arc<dyn Worker>,
306 db_exec: &dyn DbExecutor,
307 clock_fn: C,
308 locked_execution: LockedExecution,
309 retry_config: ComponentRetryConfig,
310 worker_span: Span,
311 ) -> Result<(), DbErrorWrite> {
312 debug!("Worker::run starting");
313 trace!(
314 version = %locked_execution.next_version,
315 params = ?locked_execution.params,
316 event_history = ?locked_execution.event_history,
317 "Worker::run starting"
318 );
319 let can_be_retried = ExecutionLog::can_be_retried_after(
320 locked_execution.intermittent_event_count + 1,
321 retry_config.max_retries,
322 retry_config.retry_exp_backoff,
323 );
324 let unlock_expiry_on_limit_reached =
325 ExecutionLog::compute_retry_duration_when_retrying_forever(
326 locked_execution.intermittent_event_count + 1,
327 retry_config.retry_exp_backoff,
328 );
329 let ctx = WorkerContext {
330 execution_id: locked_execution.execution_id.clone(),
331 metadata: locked_execution.metadata,
332 ffqn: locked_execution.ffqn,
333 params: locked_execution.params,
334 event_history: locked_execution.event_history,
335 responses: locked_execution
336 .responses
337 .into_iter()
338 .map(|outer| outer.event)
339 .collect(),
340 version: locked_execution.next_version,
341 can_be_retried: can_be_retried.is_some(),
342 locked_event: locked_execution.locked_event,
343 worker_span,
344 };
345 let worker_result = worker.run(ctx).await;
346 trace!(?worker_result, "Worker::run finished");
347 let result_obtained_at = clock_fn.now();
348 match Self::worker_result_to_execution_event(
349 locked_execution.execution_id,
350 worker_result,
351 result_obtained_at,
352 locked_execution.parent,
353 can_be_retried,
354 unlock_expiry_on_limit_reached,
355 )? {
356 Some(append) => {
357 trace!("Appending {append:?}");
358 match append {
359 AppendOrCancel::Cancel {
360 execution_id,
361 cancelled_at,
362 } => db_exec
363 .cancel_activity_with_retries(&execution_id, cancelled_at)
364 .await
365 .map(|_| ()),
366 AppendOrCancel::Other(append) => append.append(db_exec).await,
367 }
368 }
369 None => Ok(()),
370 }
371 }
372
373 fn worker_result_to_execution_event(
375 execution_id: ExecutionId,
376 worker_result: WorkerResult,
377 result_obtained_at: DateTime<Utc>,
378 parent: Option<(ExecutionId, JoinSetId)>,
379 can_be_retried: Option<Duration>,
380 unlock_expiry_on_limit_reached: Duration,
381 ) -> Result<Option<AppendOrCancel>, DbErrorWrite> {
382 Ok(match worker_result {
383 WorkerResult::Ok(result, new_version, http_client_traces) => {
384 info!("Execution finished: {result}");
385 let child_finished =
386 parent.map(
387 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
388 parent_execution_id,
389 parent_join_set,
390 result: result.clone(),
391 },
392 );
393 let primary_event = AppendRequest {
394 created_at: result_obtained_at,
395 event: ExecutionEventInner::Finished {
396 result,
397 http_client_traces,
398 },
399 };
400
401 Some(AppendOrCancel::Other(Append {
402 created_at: result_obtained_at,
403 primary_event,
404 execution_id,
405 version: new_version,
406 child_finished,
407 }))
408 }
409 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
410 WorkerResult::Err(err) => {
411 let reason_generic = err.to_string(); let (primary_event, child_finished, version) = match err {
414 WorkerError::TemporaryTimeout {
415 http_client_traces,
416 version,
417 } => {
418 if let Some(duration) = can_be_retried {
419 let backoff_expires_at = result_obtained_at + duration;
420 info!(
421 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
422 );
423 (
424 ExecutionEventInner::TemporarilyTimedOut {
425 backoff_expires_at,
426 http_client_traces,
427 },
428 None,
429 version,
430 )
431 } else {
432 info!("Execution timed out");
433 let result = SupportedFunctionReturnValue::ExecutionError(
434 FinishedExecutionError {
435 kind: ExecutionFailureKind::TimedOut,
436 reason: None,
437 detail: None,
438 },
439 );
440 let child_finished =
441 parent.map(|(parent_execution_id, parent_join_set)| {
442 ChildFinishedResponse {
443 parent_execution_id,
444 parent_join_set,
445 result: result.clone(),
446 }
447 });
448 (
449 ExecutionEventInner::Finished {
450 result,
451 http_client_traces,
452 },
453 child_finished,
454 version,
455 )
456 }
457 }
458 WorkerError::DbError(db_error) => {
459 return Err(db_error);
460 }
461 WorkerError::ActivityTrap {
462 reason: _, trap_kind,
464 detail,
465 version,
466 http_client_traces,
467 } => {
468 if let Some(duration) = can_be_retried {
469 let expires_at = result_obtained_at + duration;
470 debug!(
471 "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
472 );
473 (
474 ExecutionEventInner::TemporarilyFailed {
475 reason: StrVariant::from(reason_generic),
476 backoff_expires_at: expires_at,
477 detail,
478 http_client_traces,
479 },
480 None,
481 version,
482 )
483 } else {
484 info!(
485 "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
486 );
487 let result = SupportedFunctionReturnValue::ExecutionError(
488 FinishedExecutionError {
489 reason: Some(reason_generic),
490 kind: ExecutionFailureKind::Uncategorized,
491 detail,
492 },
493 );
494 let child_finished =
495 parent.map(|(parent_execution_id, parent_join_set)| {
496 ChildFinishedResponse {
497 parent_execution_id,
498 parent_join_set,
499 result: result.clone(),
500 }
501 });
502 (
503 ExecutionEventInner::Finished {
504 result,
505 http_client_traces,
506 },
507 child_finished,
508 version,
509 )
510 }
511 }
512 WorkerError::ActivityPreopenedDirError {
513 reason,
514 detail,
515 version,
516 } => {
517 let http_client_traces = None;
518 if let Some(duration) = can_be_retried {
519 let expires_at = result_obtained_at + duration;
520 debug!(
521 "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
522 );
523 (
524 ExecutionEventInner::TemporarilyFailed {
525 reason: StrVariant::from(reason_generic),
526 backoff_expires_at: expires_at,
527 detail: Some(detail),
528 http_client_traces,
529 },
530 None,
531 version,
532 )
533 } else {
534 info!(
535 "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
536 );
537 let result = SupportedFunctionReturnValue::ExecutionError(
538 FinishedExecutionError {
539 reason: Some(reason_generic),
540 kind: ExecutionFailureKind::Uncategorized,
541 detail: Some(detail),
542 },
543 );
544 let child_finished =
545 parent.map(|(parent_execution_id, parent_join_set)| {
546 ChildFinishedResponse {
547 parent_execution_id,
548 parent_join_set,
549 result: result.clone(),
550 }
551 });
552 (
553 ExecutionEventInner::Finished {
554 result,
555 http_client_traces,
556 },
557 child_finished,
558 version,
559 )
560 }
561 }
562 WorkerError::ActivityReturnedError {
563 detail,
564 version,
565 http_client_traces,
566 } => {
567 let duration = can_be_retried.expect(
568 "ActivityReturnedError must not be returned when retries are exhausted",
569 );
570 let expires_at = result_obtained_at + duration;
571 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
572 (
573 ExecutionEventInner::TemporarilyFailed {
574 backoff_expires_at: expires_at,
575 reason: StrVariant::Static("activity returned error"), detail, http_client_traces,
578 },
579 None,
580 version,
581 )
582 }
583 WorkerError::LimitReached {
584 reason,
585 version: new_version,
586 } => {
587 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
588 warn!(
589 "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
590 );
591 (
592 ExecutionEventInner::Unlocked {
593 backoff_expires_at: expires_at,
594 reason: StrVariant::from(reason),
595 },
596 None,
597 new_version,
598 )
599 }
600 WorkerError::FatalError(FatalError::Cancelled, _version) => {
601 return Ok(Some(AppendOrCancel::Cancel {
603 execution_id,
604 cancelled_at: result_obtained_at,
605 }));
606 }
607 WorkerError::FatalError(fatal_error, version) => {
608 warn!("Fatal worker error - {fatal_error:?}");
609 let result = SupportedFunctionReturnValue::ExecutionError(
610 FinishedExecutionError::from(fatal_error),
611 );
612 let child_finished =
613 parent.map(|(parent_execution_id, parent_join_set)| {
614 ChildFinishedResponse {
615 parent_execution_id,
616 parent_join_set,
617 result: result.clone(),
618 }
619 });
620 (
621 ExecutionEventInner::Finished {
622 result,
623 http_client_traces: None,
624 },
625 child_finished,
626 version,
627 )
628 }
629 };
630 Some(AppendOrCancel::Other(Append {
631 created_at: result_obtained_at,
632 primary_event: AppendRequest {
633 created_at: result_obtained_at,
634 event: primary_event,
635 },
636 execution_id,
637 version,
638 child_finished,
639 }))
640 }
641 })
642 }
643}
644
645#[derive(Debug, Clone)]
646pub(crate) struct ChildFinishedResponse {
647 pub(crate) parent_execution_id: ExecutionId,
648 pub(crate) parent_join_set: JoinSetId,
649 pub(crate) result: SupportedFunctionReturnValue,
650}
651
652#[derive(Debug, Clone)]
653#[expect(clippy::large_enum_variant)]
654pub(crate) enum AppendOrCancel {
655 Cancel {
656 execution_id: ExecutionId,
657 cancelled_at: DateTime<Utc>,
658 },
659 Other(Append),
660}
661
662#[derive(Debug, Clone)]
663pub(crate) struct Append {
664 pub(crate) created_at: DateTime<Utc>,
665 pub(crate) primary_event: AppendRequest,
666 pub(crate) execution_id: ExecutionId,
667 pub(crate) version: Version,
668 pub(crate) child_finished: Option<ChildFinishedResponse>,
669}
670
671impl Append {
672 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
673 if let Some(child_finished) = self.child_finished {
674 assert_matches!(
675 &self.primary_event,
676 AppendRequest {
677 event: ExecutionEventInner::Finished { .. },
678 ..
679 }
680 );
681 let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
682 let events = AppendEventsToExecution {
683 execution_id: self.execution_id,
684 version: self.version.clone(),
685 batch: vec![self.primary_event],
686 };
687 let response = AppendResponseToExecution {
688 parent_execution_id: child_finished.parent_execution_id,
689 created_at: self.created_at,
690 join_set_id: child_finished.parent_join_set,
691 child_execution_id,
692 finished_version: self.version, result: child_finished.result,
694 };
695
696 db_exec
697 .append_batch_respond_to_parent(events, response, self.created_at)
698 .await?;
699 } else {
700 db_exec
701 .append(self.execution_id, self.version, self.primary_event)
702 .await?;
703 }
704 Ok(())
705 }
706}
707
708#[cfg(any(test, feature = "test"))]
709pub mod simple_worker {
710 use crate::worker::{Worker, WorkerContext, WorkerResult};
711 use async_trait::async_trait;
712 use concepts::{
713 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
714 storage::{HistoryEvent, Version},
715 };
716 use indexmap::IndexMap;
717 use std::sync::Arc;
718 use tracing::trace;
719
720 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
721 pub type SimpleWorkerResultMap =
722 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
723
724 #[derive(Clone, Debug)]
725 pub struct SimpleWorker {
726 pub worker_results_rev: SimpleWorkerResultMap,
727 pub ffqn: FunctionFqn,
728 exported: [FunctionMetadata; 1],
729 }
730
731 impl SimpleWorker {
732 #[must_use]
733 pub fn with_single_result(res: WorkerResult) -> Self {
734 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
735 Version::new(2),
736 (vec![], res),
737 )]))))
738 }
739
740 #[must_use]
741 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
742 Self {
743 worker_results_rev: self.worker_results_rev,
744 exported: [FunctionMetadata {
745 ffqn: ffqn.clone(),
746 parameter_types: ParameterTypes::default(),
747 return_type: RETURN_TYPE_DUMMY,
748 extension: None,
749 submittable: true,
750 }],
751 ffqn,
752 }
753 }
754
755 #[must_use]
756 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
757 Self {
758 worker_results_rev,
759 ffqn: FFQN_SOME,
760 exported: [FunctionMetadata {
761 ffqn: FFQN_SOME,
762 parameter_types: ParameterTypes::default(),
763 return_type: RETURN_TYPE_DUMMY,
764 extension: None,
765 submittable: true,
766 }],
767 }
768 }
769 }
770
771 #[async_trait]
772 impl Worker for SimpleWorker {
773 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
774 let (expected_version, (expected_eh, worker_result)) =
775 self.worker_results_rev.lock().unwrap().pop().unwrap();
776 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
777 assert_eq!(expected_version, ctx.version);
778 assert_eq!(
779 expected_eh,
780 ctx.event_history
781 .iter()
782 .map(|(event, _version)| event.clone())
783 .collect::<Vec<_>>()
784 );
785 worker_result
786 }
787
788 fn exported_functions(&self) -> &[FunctionMetadata] {
789 &self.exported
790 }
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use self::simple_worker::SimpleWorker;
797 use super::*;
798 use crate::{expired_timers_watcher, worker::WorkerResult};
799 use assert_matches::assert_matches;
800 use async_trait::async_trait;
801 use concepts::storage::{
802 CreateRequest, DbConnection, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
803 };
804 use concepts::storage::{DbPoolCloseable, LockedBy};
805 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
806 use concepts::time::Now;
807 use concepts::{
808 FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
809 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
810 };
811 use db_tests::Database;
812 use indexmap::IndexMap;
813 use simple_worker::FFQN_SOME;
814 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
815 use test_utils::set_up;
816 use test_utils::sim_clock::{ConstClock, SimClock};
817
818 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
819
820 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
821 config: ExecConfig,
822 clock_fn: C,
823 db_exec: Arc<dyn DbExecutor>,
824 worker: Arc<W>,
825 executed_at: DateTime<Utc>,
826 ) -> Vec<ExecutionId> {
827 trace!("Ticking with {worker:?}");
828 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
829 let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
830 executor
831 .tick_test_await(executed_at, RunId::generate())
832 .await
833 }
834
835 #[tokio::test]
836 async fn execute_simple_lifecycle_tick_based_mem() {
837 let created_at = Now.now();
838 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
839 execute_simple_lifecycle_tick_based(
840 db_pool.connection().as_ref(),
841 db_exec.clone(),
842 ConstClock(created_at),
843 )
844 .await;
845 db_close.close().await;
846 }
847
848 #[tokio::test]
849 async fn execute_simple_lifecycle_tick_based_sqlite() {
850 let created_at = Now.now();
851 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
852 execute_simple_lifecycle_tick_based(
853 db_pool.connection().as_ref(),
854 db_exec.clone(),
855 ConstClock(created_at),
856 )
857 .await;
858 db_close.close().await;
859 }
860
861 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
862 db_connection: &dyn DbConnection,
863 db_exec: Arc<dyn DbExecutor>,
864 clock_fn: C,
865 ) {
866 set_up();
867 let created_at = clock_fn.now();
868 let exec_config = ExecConfig {
869 batch_size: 1,
870 lock_expiry: Duration::from_secs(1),
871 tick_sleep: Duration::from_millis(100),
872 component_id: ComponentId::dummy_activity(),
873 task_limiter: None,
874 executor_id: ExecutorId::generate(),
875 retry_config: ComponentRetryConfig::ZERO,
876 };
877
878 let execution_log = create_and_tick(
879 CreateAndTickConfig {
880 execution_id: ExecutionId::generate(),
881 created_at,
882 executed_at: created_at,
883 },
884 clock_fn,
885 db_connection,
886 db_exec,
887 exec_config,
888 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
889 SUPPORTED_RETURN_VALUE_OK_EMPTY,
890 Version::new(2),
891 None,
892 ))),
893 tick_fn,
894 )
895 .await;
896 assert_matches!(
897 execution_log.events.get(2).unwrap(),
898 ExecutionEvent {
899 event: ExecutionEventInner::Finished {
900 result: SupportedFunctionReturnValue::Ok { ok: None },
901 http_client_traces: None
902 },
903 created_at: _,
904 backtrace_id: None,
905 version: Version(2),
906 }
907 );
908 }
909
910 #[tokio::test]
911 async fn execute_simple_lifecycle_task_based_mem() {
912 set_up();
913 let created_at = Now.now();
914 let clock_fn = ConstClock(created_at);
915 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
916 let exec_config = ExecConfig {
917 batch_size: 1,
918 lock_expiry: Duration::from_secs(1),
919 tick_sleep: Duration::ZERO,
920 component_id: ComponentId::dummy_activity(),
921 task_limiter: None,
922 executor_id: ExecutorId::generate(),
923 retry_config: ComponentRetryConfig::ZERO,
924 };
925
926 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
927 SUPPORTED_RETURN_VALUE_OK_EMPTY,
928 Version::new(2),
929 None,
930 )));
931
932 let execution_log = create_and_tick(
933 CreateAndTickConfig {
934 execution_id: ExecutionId::generate(),
935 created_at,
936 executed_at: created_at,
937 },
938 clock_fn,
939 db_pool.connection().as_ref(),
940 db_exec,
941 exec_config,
942 worker,
943 tick_fn,
944 )
945 .await;
946 assert_matches!(
947 execution_log.events.get(2).unwrap(),
948 ExecutionEvent {
949 event: ExecutionEventInner::Finished {
950 result: SupportedFunctionReturnValue::Ok { ok: None },
951 http_client_traces: None
952 },
953 created_at: _,
954 backtrace_id: None,
955 version: Version(2),
956 }
957 );
958 db_close.close().await;
959 }
960
961 struct CreateAndTickConfig {
962 execution_id: ExecutionId,
963 created_at: DateTime<Utc>,
964 executed_at: DateTime<Utc>,
965 }
966
967 async fn create_and_tick<
968 W: Worker,
969 C: ClockFn,
970 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
971 F: Future<Output = Vec<ExecutionId>>,
972 >(
973 config: CreateAndTickConfig,
974 clock_fn: C,
975 db_connection: &dyn DbConnection,
976 db_exec: Arc<dyn DbExecutor>,
977 exec_config: ExecConfig,
978 worker: Arc<W>,
979 mut tick: T,
980 ) -> ExecutionLog {
981 db_connection
983 .create(CreateRequest {
984 created_at: config.created_at,
985 execution_id: config.execution_id.clone(),
986 ffqn: FFQN_SOME,
987 params: Params::empty(),
988 parent: None,
989 metadata: concepts::ExecutionMetadata::empty(),
990 scheduled_at: config.created_at,
991 component_id: ComponentId::dummy_activity(),
992 scheduled_by: None,
993 })
994 .await
995 .unwrap();
996 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
998 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
999 debug!("Execution history after tick: {execution_log:?}");
1000 assert_matches!(
1002 execution_log.events.first().unwrap(),
1003 ExecutionEvent {
1004 event: ExecutionEventInner::Created { .. },
1005 created_at: actually_created_at,
1006 backtrace_id: None,
1007 version: Version(0),
1008 }
1009 if config.created_at == *actually_created_at
1010 );
1011 let locked_at = assert_matches!(
1012 execution_log.events.get(1).unwrap(),
1013 ExecutionEvent {
1014 event: ExecutionEventInner::Locked { .. },
1015 created_at: locked_at,
1016 backtrace_id: None,
1017 version: Version(1),
1018 } if config.created_at <= *locked_at
1019 => *locked_at
1020 );
1021 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
1022 event: _,
1023 created_at: executed_at,
1024 backtrace_id: None,
1025 version: Version(2),
1026 } if *executed_at >= locked_at);
1027 execution_log
1028 }
1029
1030 #[tokio::test]
1031 async fn activity_trap_should_trigger_an_execution_retry() {
1032 set_up();
1033 let sim_clock = SimClock::default();
1034 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1035 let retry_exp_backoff = Duration::from_millis(100);
1036 let retry_config = ComponentRetryConfig {
1037 max_retries: 1,
1038 retry_exp_backoff,
1039 };
1040 let exec_config = ExecConfig {
1041 batch_size: 1,
1042 lock_expiry: Duration::from_secs(1),
1043 tick_sleep: Duration::ZERO,
1044 component_id: ComponentId::dummy_activity(),
1045 task_limiter: None,
1046 executor_id: ExecutorId::generate(),
1047 retry_config,
1048 };
1049 let expected_reason = "error reason";
1050 let expected_detail = "error detail";
1051 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1052 WorkerError::ActivityTrap {
1053 reason: expected_reason.to_string(),
1054 trap_kind: concepts::TrapKind::Trap,
1055 detail: Some(expected_detail.to_string()),
1056 version: Version::new(2),
1057 http_client_traces: None,
1058 },
1059 )));
1060 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1061 let execution_log = create_and_tick(
1062 CreateAndTickConfig {
1063 execution_id: ExecutionId::generate(),
1064 created_at: sim_clock.now(),
1065 executed_at: sim_clock.now(),
1066 },
1067 sim_clock.clone(),
1068 db_pool.connection().as_ref(),
1069 db_exec.clone(),
1070 exec_config.clone(),
1071 worker,
1072 tick_fn,
1073 )
1074 .await;
1075 assert_eq!(3, execution_log.events.len());
1076 {
1077 let (reason, detail, at, expires_at) = assert_matches!(
1078 &execution_log.events.get(2).unwrap(),
1079 ExecutionEvent {
1080 event: ExecutionEventInner::TemporarilyFailed {
1081 reason,
1082 detail,
1083 backoff_expires_at,
1084 http_client_traces: None,
1085 },
1086 created_at: at,
1087 backtrace_id: None,
1088 version: Version(2),
1089 }
1090 => (reason, detail, *at, *backoff_expires_at)
1091 );
1092 assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1093 assert_eq!(Some(expected_detail), detail.as_deref());
1094 assert_eq!(at, sim_clock.now());
1095 assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1096 }
1097 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1098 std::sync::Mutex::new(IndexMap::from([(
1099 Version::new(4),
1100 (
1101 vec![],
1102 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1103 ),
1104 )])),
1105 )));
1106 assert!(
1108 tick_fn(
1109 exec_config.clone(),
1110 sim_clock.clone(),
1111 db_exec.clone(),
1112 worker.clone(),
1113 sim_clock.now(),
1114 )
1115 .await
1116 .is_empty()
1117 );
1118 sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1120 tick_fn(
1121 exec_config,
1122 sim_clock.clone(),
1123 db_exec.clone(),
1124 worker,
1125 sim_clock.now(),
1126 )
1127 .await;
1128 let execution_log = {
1129 let db_connection = db_pool.connection();
1130 db_connection
1131 .get(&execution_log.execution_id)
1132 .await
1133 .unwrap()
1134 };
1135 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1136 assert_matches!(
1137 execution_log.events.get(3).unwrap(),
1138 ExecutionEvent {
1139 event: ExecutionEventInner::Locked { .. },
1140 created_at: at,
1141 backtrace_id: None,
1142 version: Version(3),
1143 } if *at == sim_clock.now()
1144 );
1145 assert_matches!(
1146 execution_log.events.get(4).unwrap(),
1147 ExecutionEvent {
1148 event: ExecutionEventInner::Finished {
1149 result: SupportedFunctionReturnValue::Ok{ok:None},
1150 http_client_traces: None
1151 },
1152 created_at: finished_at,
1153 backtrace_id: None,
1154 version: Version(4),
1155 } if *finished_at == sim_clock.now()
1156 );
1157 db_close.close().await;
1158 }
1159
1160 #[tokio::test]
1161 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1162 set_up();
1163 let created_at = Now.now();
1164 let clock_fn = ConstClock(created_at);
1165 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1166 let exec_config = ExecConfig {
1167 batch_size: 1,
1168 lock_expiry: Duration::from_secs(1),
1169 tick_sleep: Duration::ZERO,
1170 component_id: ComponentId::dummy_activity(),
1171 task_limiter: None,
1172 executor_id: ExecutorId::generate(),
1173 retry_config: ComponentRetryConfig::ZERO,
1174 };
1175
1176 let reason = "error reason";
1177 let expected_reason = format!("activity trap: {reason}");
1178 let expected_detail = "error detail";
1179 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1180 WorkerError::ActivityTrap {
1181 reason: reason.to_string(),
1182 trap_kind: concepts::TrapKind::Trap,
1183 detail: Some(expected_detail.to_string()),
1184 version: Version::new(2),
1185 http_client_traces: None,
1186 },
1187 )));
1188 let execution_log = create_and_tick(
1189 CreateAndTickConfig {
1190 execution_id: ExecutionId::generate(),
1191 created_at,
1192 executed_at: created_at,
1193 },
1194 clock_fn,
1195 db_pool.connection().as_ref(),
1196 db_exec,
1197 exec_config.clone(),
1198 worker,
1199 tick_fn,
1200 )
1201 .await;
1202 assert_eq!(3, execution_log.events.len());
1203 let (reason, kind, detail) = assert_matches!(
1204 &execution_log.events.get(2).unwrap(),
1205 ExecutionEvent {
1206 event: ExecutionEventInner::Finished{
1207 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1208 http_client_traces: None
1209 },
1210 created_at: at,
1211 backtrace_id: None,
1212 version: Version(2),
1213 } if *at == created_at
1214 => (reason, kind, detail)
1215 );
1216
1217 assert_eq!(Some(expected_reason), *reason);
1218 assert_eq!(Some(expected_detail), detail.as_deref());
1219 assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1220
1221 db_close.close().await;
1222 }
1223
1224 #[tokio::test]
1225 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1226 let worker_error = WorkerError::ActivityTrap {
1227 reason: "error reason".to_string(),
1228 trap_kind: TrapKind::Trap,
1229 detail: Some("detail".to_string()),
1230 version: Version::new(2),
1231 http_client_traces: None,
1232 };
1233 let expected_child_err = FinishedExecutionError {
1234 kind: ExecutionFailureKind::Uncategorized,
1235 reason: Some("activity trap: error reason".to_string()),
1236 detail: Some("detail".to_string()),
1237 };
1238 child_execution_permanently_failed_should_notify_parent(
1239 WorkerResult::Err(worker_error),
1240 expected_child_err,
1241 )
1242 .await;
1243 }
1244
1245 #[tokio::test]
1246 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1247 let expected_child_err = FinishedExecutionError {
1248 kind: ExecutionFailureKind::TimedOut,
1249 reason: None,
1250 detail: None,
1251 };
1252 child_execution_permanently_failed_should_notify_parent(
1253 WorkerResult::DbUpdatedByWorkerOrWatcher,
1254 expected_child_err,
1255 )
1256 .await;
1257 }
1258
1259 async fn child_execution_permanently_failed_should_notify_parent(
1260 worker_result: WorkerResult,
1261 expected_child_err: FinishedExecutionError,
1262 ) {
1263 use concepts::storage::JoinSetResponseEventOuter;
1264 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1265
1266 set_up();
1267 let sim_clock = SimClock::default();
1268 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1269
1270 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1271 WorkerResult::DbUpdatedByWorkerOrWatcher,
1272 ));
1273 let parent_execution_id = ExecutionId::generate();
1274 db_pool
1275 .connection()
1276 .create(CreateRequest {
1277 created_at: sim_clock.now(),
1278 execution_id: parent_execution_id.clone(),
1279 ffqn: FFQN_SOME,
1280 params: Params::empty(),
1281 parent: None,
1282 metadata: concepts::ExecutionMetadata::empty(),
1283 scheduled_at: sim_clock.now(),
1284 component_id: ComponentId::dummy_activity(),
1285 scheduled_by: None,
1286 })
1287 .await
1288 .unwrap();
1289 let parent_executor_id = ExecutorId::generate();
1290 tick_fn(
1291 ExecConfig {
1292 batch_size: 1,
1293 lock_expiry: LOCK_EXPIRY,
1294 tick_sleep: Duration::ZERO,
1295 component_id: ComponentId::dummy_activity(),
1296 task_limiter: None,
1297 executor_id: parent_executor_id,
1298 retry_config: ComponentRetryConfig::ZERO,
1299 },
1300 sim_clock.clone(),
1301 db_exec.clone(),
1302 parent_worker,
1303 sim_clock.now(),
1304 )
1305 .await;
1306
1307 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1308 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1309 {
1311 let params = Params::empty();
1312 let child = CreateRequest {
1313 created_at: sim_clock.now(),
1314 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1315 ffqn: FFQN_CHILD,
1316 params: params.clone(),
1317 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1318 metadata: concepts::ExecutionMetadata::empty(),
1319 scheduled_at: sim_clock.now(),
1320 component_id: ComponentId::dummy_activity(),
1321 scheduled_by: None,
1322 };
1323 let current_time = sim_clock.now();
1324 let join_set = AppendRequest {
1325 created_at: current_time,
1326 event: ExecutionEventInner::HistoryEvent {
1327 event: HistoryEvent::JoinSetCreate {
1328 join_set_id: join_set_id.clone(),
1329 },
1330 },
1331 };
1332 let child_exec_req = AppendRequest {
1333 created_at: current_time,
1334 event: ExecutionEventInner::HistoryEvent {
1335 event: HistoryEvent::JoinSetRequest {
1336 join_set_id: join_set_id.clone(),
1337 request: JoinSetRequest::ChildExecutionRequest {
1338 child_execution_id: child_execution_id.clone(),
1339 target_ffqn: FFQN_CHILD,
1340 params,
1341 },
1342 },
1343 },
1344 };
1345 let join_next = AppendRequest {
1346 created_at: current_time,
1347 event: ExecutionEventInner::HistoryEvent {
1348 event: HistoryEvent::JoinNext {
1349 join_set_id: join_set_id.clone(),
1350 run_expires_at: sim_clock.now(),
1351 closing: false,
1352 requested_ffqn: Some(FFQN_CHILD),
1353 },
1354 },
1355 };
1356 db_pool
1357 .connection()
1358 .append_batch_create_new_execution(
1359 current_time,
1360 vec![join_set, child_exec_req, join_next],
1361 parent_execution_id.clone(),
1362 Version::new(2),
1363 vec![child],
1364 )
1365 .await
1366 .unwrap();
1367 }
1368
1369 let child_worker =
1370 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1371
1372 tick_fn(
1374 ExecConfig {
1375 batch_size: 1,
1376 lock_expiry: LOCK_EXPIRY,
1377 tick_sleep: Duration::ZERO,
1378 component_id: ComponentId::dummy_activity(),
1379 task_limiter: None,
1380 executor_id: ExecutorId::generate(),
1381 retry_config: ComponentRetryConfig::ZERO,
1382 },
1383 sim_clock.clone(),
1384 db_exec.clone(),
1385 child_worker,
1386 sim_clock.now(),
1387 )
1388 .await;
1389 if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1390 sim_clock.move_time_forward(LOCK_EXPIRY);
1392 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1393 .await
1394 .unwrap();
1395 }
1396 let child_log = db_pool
1397 .connection()
1398 .get(&ExecutionId::Derived(child_execution_id.clone()))
1399 .await
1400 .unwrap();
1401 assert!(child_log.pending_state.is_finished());
1402 assert_eq!(
1403 Version(2),
1404 child_log.next_version,
1405 "created = 0, locked = 1, with_single_result = 2"
1406 );
1407 assert_eq!(
1408 ExecutionEventInner::Finished {
1409 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1410 http_client_traces: None
1411 },
1412 child_log.last_event().event
1413 );
1414 let parent_log = db_pool
1415 .connection()
1416 .get(&parent_execution_id)
1417 .await
1418 .unwrap();
1419 assert_matches!(
1420 parent_log.pending_state,
1421 PendingState::PendingAt {
1422 scheduled_at,
1423 last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1424 } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1425 "parent should be back to pending"
1426 );
1427 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1428 parent_log.responses.last(),
1429 Some(JoinSetResponseEventOuter{
1430 created_at: at,
1431 event: JoinSetResponseEvent{
1432 join_set_id: found_join_set_id,
1433 event: JoinSetResponse::ChildExecutionFinished {
1434 child_execution_id: found_child_execution_id,
1435 finished_version,
1436 result: found_result,
1437 }
1438 }
1439 })
1440 if *at == sim_clock.now()
1441 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1442 );
1443 assert_eq!(join_set_id, *found_join_set_id);
1444 assert_eq!(child_execution_id, *found_child_execution_id);
1445 assert_eq!(child_log.next_version, *child_finished_version);
1446 assert_matches!(
1447 found_result,
1448 SupportedFunctionReturnValue::ExecutionError(_)
1449 );
1450
1451 db_close.close().await;
1452 }
1453
1454 #[derive(Clone, Debug)]
1455 struct SleepyWorker {
1456 duration: Duration,
1457 result: SupportedFunctionReturnValue,
1458 exported: [FunctionMetadata; 1],
1459 }
1460
1461 #[async_trait]
1462 impl Worker for SleepyWorker {
1463 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1464 tokio::time::sleep(self.duration).await;
1465 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1466 }
1467
1468 fn exported_functions(&self) -> &[FunctionMetadata] {
1469 &self.exported
1470 }
1471 }
1472
1473 #[tokio::test]
1474 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1475 set_up();
1476 let sim_clock = SimClock::default();
1477 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1478 let lock_expiry = Duration::from_millis(100);
1479 let timeout_duration = Duration::from_millis(300);
1480 let retry_config = ComponentRetryConfig {
1481 max_retries: 1,
1482 retry_exp_backoff: timeout_duration,
1483 };
1484 let exec_config = ExecConfig {
1485 batch_size: 1,
1486 lock_expiry,
1487 tick_sleep: Duration::ZERO,
1488 component_id: ComponentId::dummy_activity(),
1489 task_limiter: None,
1490 executor_id: ExecutorId::generate(),
1491 retry_config,
1492 };
1493
1494 let worker = Arc::new(SleepyWorker {
1495 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1497 exported: [FunctionMetadata {
1498 ffqn: FFQN_SOME,
1499 parameter_types: ParameterTypes::default(),
1500 return_type: RETURN_TYPE_DUMMY,
1501 extension: None,
1502 submittable: true,
1503 }],
1504 });
1505 let execution_id = ExecutionId::generate();
1507 let db_connection = db_pool.connection();
1508 db_connection
1509 .create(CreateRequest {
1510 created_at: sim_clock.now(),
1511 execution_id: execution_id.clone(),
1512 ffqn: FFQN_SOME,
1513 params: Params::empty(),
1514 parent: None,
1515 metadata: concepts::ExecutionMetadata::empty(),
1516 scheduled_at: sim_clock.now(),
1517 component_id: ComponentId::dummy_activity(),
1518 scheduled_by: None,
1519 })
1520 .await
1521 .unwrap();
1522
1523 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1524 let executor = ExecTask::new_test(
1525 worker,
1526 exec_config.clone(),
1527 sim_clock.clone(),
1528 db_exec.clone(),
1529 ffqns,
1530 );
1531 let mut first_execution_progress = executor
1532 .tick(sim_clock.now(), RunId::generate())
1533 .await
1534 .unwrap();
1535 assert_eq!(1, first_execution_progress.executions.len());
1536 sim_clock.move_time_forward(lock_expiry);
1538 let now_after_first_lock_expiry = sim_clock.now();
1540 {
1541 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1542 let cleanup_progress = executor
1543 .tick(now_after_first_lock_expiry, RunId::generate())
1544 .await
1545 .unwrap();
1546 assert!(cleanup_progress.executions.is_empty());
1547 }
1548 {
1549 let expired_locks = expired_timers_watcher::tick(
1550 db_pool.connection().as_ref(),
1551 now_after_first_lock_expiry,
1552 )
1553 .await
1554 .unwrap()
1555 .expired_locks;
1556 assert_eq!(1, expired_locks);
1557 }
1558 assert!(
1559 !first_execution_progress
1560 .executions
1561 .pop()
1562 .unwrap()
1563 .1
1564 .is_finished()
1565 );
1566
1567 let execution_log = db_connection.get(&execution_id).await.unwrap();
1568 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1569 assert_matches!(
1570 &execution_log.events.get(2).unwrap(),
1571 ExecutionEvent {
1572 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1573 created_at: at,
1574 backtrace_id: None,
1575 version: Version(2),
1576 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1577 );
1578 assert_matches!(
1579 execution_log.pending_state,
1580 PendingState::PendingAt {
1581 scheduled_at: found_scheduled_by,
1582 last_lock: Some(LockedBy {
1583 executor_id: found_executor_id,
1584 run_id: _,
1585 }),
1586 } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1587 );
1588 sim_clock.move_time_forward(timeout_duration);
1589 let now_after_first_timeout = sim_clock.now();
1590 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1591
1592 let mut second_execution_progress = executor
1593 .tick(now_after_first_timeout, RunId::generate())
1594 .await
1595 .unwrap();
1596 assert_eq!(1, second_execution_progress.executions.len());
1597
1598 sim_clock.move_time_forward(lock_expiry);
1600 let now_after_second_lock_expiry = sim_clock.now();
1602 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1603 {
1604 let cleanup_progress = executor
1605 .tick(now_after_second_lock_expiry, RunId::generate())
1606 .await
1607 .unwrap();
1608 assert!(cleanup_progress.executions.is_empty());
1609 }
1610 {
1611 let expired_locks = expired_timers_watcher::tick(
1612 db_pool.connection().as_ref(),
1613 now_after_second_lock_expiry,
1614 )
1615 .await
1616 .unwrap()
1617 .expired_locks;
1618 assert_eq!(1, expired_locks);
1619 }
1620 assert!(
1621 !second_execution_progress
1622 .executions
1623 .pop()
1624 .unwrap()
1625 .1
1626 .is_finished()
1627 );
1628
1629 drop(db_connection);
1630 drop(executor);
1631 db_close.close().await;
1632 }
1633}