1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, ExecutionLog, LockedExecution,
8};
9use concepts::time::{ClockFn, Sleep};
10use concepts::{
11 ComponentId, ComponentRetryConfig, FunctionMetadata, StrVariant, SupportedFunctionReturnValue,
12};
13use concepts::{ExecutionFailureKind, JoinSetId};
14use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
15use concepts::{
16 FinishedExecutionError,
17 storage::{ExecutionEventInner, Version},
18};
19use std::{
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31 pub lock_expiry: Duration,
32 pub tick_sleep: Duration,
33 pub batch_size: u32,
34 pub component_id: ComponentId,
35 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36 pub executor_id: ExecutorId,
37 pub retry_config: ComponentRetryConfig,
38}
39
40pub struct ExecTask<C: ClockFn> {
41 worker: Arc<dyn Worker>,
42 config: ExecConfig,
43 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
45 ffqns: Arc<[FunctionFqn]>,
46}
47
48#[derive(derive_more::Debug, Default)]
49pub struct ExecutionProgress {
50 #[debug(skip)]
51 #[allow(dead_code)]
52 executions: Vec<(ExecutionId, JoinHandle<()>)>,
53}
54
55impl ExecutionProgress {
56 #[cfg(feature = "test")]
57 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
58 let mut vec = Vec::new();
59 for (exe, join_handle) in self.executions {
60 vec.push(exe);
61 join_handle.await.unwrap();
62 }
63 vec
64 }
65}
66
67#[derive(derive_more::Debug)]
68pub struct ExecutorTaskHandle {
69 #[debug(skip)]
70 is_closing: Arc<AtomicBool>,
71 #[debug(skip)]
72 abort_handle: AbortHandle,
73 component_id: ComponentId,
74 executor_id: ExecutorId,
75}
76
77impl ExecutorTaskHandle {
78 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
79 pub async fn close(&self) {
80 trace!("Gracefully closing");
81 self.is_closing.store(true, Ordering::Relaxed);
89 while !self.abort_handle.is_finished() {
90 tokio::time::sleep(Duration::from_millis(1)).await;
91 }
92 debug!("Gracefully closed");
93 }
94}
95
96impl Drop for ExecutorTaskHandle {
97 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
98 fn drop(&mut self) {
99 if self.abort_handle.is_finished() {
100 return;
101 }
102 warn!("Aborting the executor task");
103 self.abort_handle.abort();
104 }
105}
106
107#[cfg(feature = "test")]
108pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
109 extract_exported_ffqns_noext(worker)
110}
111
112fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
113 worker
114 .exported_functions()
115 .iter()
116 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
117 .collect::<Arc<_>>()
118}
119
120impl<C: ClockFn + 'static> ExecTask<C> {
121 #[cfg(feature = "test")]
122 pub fn new_test(
123 worker: Arc<dyn Worker>,
124 config: ExecConfig,
125 clock_fn: C,
126 db_exec: Arc<dyn DbExecutor>,
127 ffqns: Arc<[FunctionFqn]>,
128 ) -> Self {
129 Self {
130 worker,
131 config,
132 clock_fn,
133 db_exec,
134 ffqns,
135 }
136 }
137
138 #[cfg(feature = "test")]
139 pub fn new_all_ffqns_test(
140 worker: Arc<dyn Worker>,
141 config: ExecConfig,
142 clock_fn: C,
143 db_exec: Arc<dyn DbExecutor>,
144 ) -> Self {
145 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
146 Self {
147 worker,
148 config,
149 clock_fn,
150 db_exec,
151 ffqns,
152 }
153 }
154
155 pub fn spawn_new(
156 worker: Arc<dyn Worker>,
157 config: ExecConfig,
158 clock_fn: C,
159 db_exec: Arc<dyn DbExecutor>,
160 sleep: impl Sleep + 'static,
161 ) -> ExecutorTaskHandle {
162 let is_closing = Arc::new(AtomicBool::default());
163 let is_closing_inner = is_closing.clone();
164 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
165 let component_id = config.component_id.clone();
166 let executor_id = config.executor_id;
167 let abort_handle = tokio::spawn(async move {
168 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
169 let task = ExecTask {
170 worker,
171 config,
172 db_exec,
173 ffqns: ffqns.clone(),
174 clock_fn: clock_fn.clone(),
175 };
176 while !is_closing_inner.load(Ordering::Relaxed) {
177 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
178
179 task.db_exec
180 .wait_for_pending(clock_fn.now(), ffqns.clone(), {
181 let sleep = sleep.clone();
182 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
183 .await;
184 }
185 })
186 .abort_handle();
187 ExecutorTaskHandle {
188 is_closing,
189 abort_handle,
190 component_id,
191 executor_id,
192 }
193 }
194
195 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
196 if let Some(task_limiter) = &self.config.task_limiter {
197 let mut locks = Vec::new();
198 for _ in 0..self.config.batch_size {
199 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
200 locks.push(Some(permit));
201 } else {
202 break;
203 }
204 }
205 locks
206 } else {
207 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
208 for _ in 0..self.config.batch_size {
209 vec.push(None);
210 }
211 vec
212 }
213 }
214
215 #[cfg(feature = "test")]
216 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
217 self.tick(executed_at, run_id).await.unwrap()
218 }
219
220 #[cfg(feature = "test")]
221 pub async fn tick_test_await(
222 &self,
223 executed_at: DateTime<Utc>,
224 run_id: RunId,
225 ) -> Vec<ExecutionId> {
226 self.tick(executed_at, run_id)
227 .await
228 .unwrap()
229 .wait_for_tasks()
230 .await
231 }
232
233 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
234 async fn tick(
235 &self,
236 executed_at: DateTime<Utc>,
237 run_id: RunId,
238 ) -> Result<ExecutionProgress, DbErrorGeneric> {
239 let locked_executions = {
240 let mut permits = self.acquire_task_permits();
241 if permits.is_empty() {
242 return Ok(ExecutionProgress::default());
243 }
244 let lock_expires_at = executed_at + self.config.lock_expiry;
245 let locked_executions = self
246 .db_exec
247 .lock_pending(
248 permits.len(), executed_at, self.ffqns.clone(),
251 executed_at, self.config.component_id.clone(),
253 self.config.executor_id,
254 lock_expires_at,
255 run_id,
256 self.config.retry_config,
257 )
258 .await?;
259 while permits.len() > locked_executions.len() {
261 permits.pop();
262 }
263 assert_eq!(permits.len(), locked_executions.len());
264 locked_executions.into_iter().zip(permits)
265 };
266
267 let mut executions = Vec::with_capacity(locked_executions.len());
268 for (locked_execution, permit) in locked_executions {
269 let execution_id = locked_execution.execution_id.clone();
270 let join_handle = {
271 let worker = self.worker.clone();
272 let db = self.db_exec.clone();
273 let clock_fn = self.clock_fn.clone();
274 let worker_span = info_span!(parent: None, "worker",
275 "otel.name" = format!("worker {}", locked_execution.ffqn),
276 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
277 locked_execution.metadata.enrich(&worker_span);
278 tokio::spawn({
279 let worker_span2 = worker_span.clone();
280 let retry_config = self.config.retry_config;
281 async move {
282 let _permit = permit;
283 let res = Self::run_worker(
284 worker,
285 db.as_ref(),
286 clock_fn,
287 locked_execution,
288 retry_config,
289 worker_span2,
290 )
291 .await;
292 if let Err(db_error) = res {
293 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
294 }
295 }
296 .instrument(worker_span)
297 })
298 };
299 executions.push((execution_id, join_handle));
300 }
301 Ok(ExecutionProgress { executions })
302 }
303
304 async fn run_worker(
305 worker: Arc<dyn Worker>,
306 db_exec: &dyn DbExecutor,
307 clock_fn: C,
308 locked_execution: LockedExecution,
309 retry_config: ComponentRetryConfig,
310 worker_span: Span,
311 ) -> Result<(), DbErrorWrite> {
312 debug!("Worker::run starting");
313 trace!(
314 version = %locked_execution.next_version,
315 params = ?locked_execution.params,
316 event_history = ?locked_execution.event_history,
317 "Worker::run starting"
318 );
319 let can_be_retried = ExecutionLog::can_be_retried_after(
320 locked_execution.intermittent_event_count + 1,
321 retry_config.max_retries,
322 retry_config.retry_exp_backoff,
323 );
324 let unlock_expiry_on_limit_reached =
325 ExecutionLog::compute_retry_duration_when_retrying_forever(
326 locked_execution.intermittent_event_count + 1,
327 retry_config.retry_exp_backoff,
328 );
329 let ctx = WorkerContext {
330 execution_id: locked_execution.execution_id.clone(),
331 metadata: locked_execution.metadata,
332 ffqn: locked_execution.ffqn,
333 params: locked_execution.params,
334 event_history: locked_execution.event_history,
335 responses: locked_execution
336 .responses
337 .into_iter()
338 .map(|outer| outer.event)
339 .collect(),
340 version: locked_execution.next_version,
341 can_be_retried: can_be_retried.is_some(),
342 locked_event: locked_execution.locked_event,
343 worker_span,
344 };
345 let worker_result = worker.run(ctx).await;
346 trace!(?worker_result, "Worker::run finished");
347 let result_obtained_at = clock_fn.now();
348 match Self::worker_result_to_execution_event(
349 locked_execution.execution_id,
350 worker_result,
351 result_obtained_at,
352 locked_execution.parent,
353 can_be_retried,
354 unlock_expiry_on_limit_reached,
355 )? {
356 Some(append) => {
357 trace!("Appending {append:?}");
358 append.append(db_exec).await
359 }
360 None => Ok(()),
361 }
362 }
363
364 fn worker_result_to_execution_event(
366 execution_id: ExecutionId,
367 worker_result: WorkerResult,
368 result_obtained_at: DateTime<Utc>,
369 parent: Option<(ExecutionId, JoinSetId)>,
370 can_be_retried: Option<Duration>,
371 unlock_expiry_on_limit_reached: Duration,
372 ) -> Result<Option<Append>, DbErrorWrite> {
373 Ok(match worker_result {
374 WorkerResult::Ok(result, new_version, http_client_traces) => {
375 info!("Execution finished: {result}");
376 let child_finished =
377 parent.map(
378 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
379 parent_execution_id,
380 parent_join_set,
381 result: result.clone(),
382 },
383 );
384 let primary_event = AppendRequest {
385 created_at: result_obtained_at,
386 event: ExecutionEventInner::Finished {
387 result,
388 http_client_traces,
389 },
390 };
391
392 Some(Append {
393 created_at: result_obtained_at,
394 primary_event,
395 execution_id,
396 version: new_version,
397 child_finished,
398 })
399 }
400 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
401 WorkerResult::Err(err) => {
402 let reason_generic = err.to_string(); let (primary_event, child_finished, version) = match err {
405 WorkerError::TemporaryTimeout {
406 http_client_traces,
407 version,
408 } => {
409 if let Some(duration) = can_be_retried {
410 let backoff_expires_at = result_obtained_at + duration;
411 info!(
412 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
413 );
414 (
415 ExecutionEventInner::TemporarilyTimedOut {
416 backoff_expires_at,
417 http_client_traces,
418 },
419 None,
420 version,
421 )
422 } else {
423 info!("Execution timed out");
424 let result = SupportedFunctionReturnValue::ExecutionError(
425 FinishedExecutionError {
426 kind: ExecutionFailureKind::TimedOut,
427 reason: None,
428 detail: None,
429 },
430 );
431 let child_finished =
432 parent.map(|(parent_execution_id, parent_join_set)| {
433 ChildFinishedResponse {
434 parent_execution_id,
435 parent_join_set,
436 result: result.clone(),
437 }
438 });
439 (
440 ExecutionEventInner::Finished {
441 result,
442 http_client_traces,
443 },
444 child_finished,
445 version,
446 )
447 }
448 }
449 WorkerError::DbError(db_error) => {
450 return Err(db_error);
451 }
452 WorkerError::ActivityTrap {
453 reason: _, trap_kind,
455 detail,
456 version,
457 http_client_traces,
458 } => {
459 if let Some(duration) = can_be_retried {
460 let expires_at = result_obtained_at + duration;
461 debug!(
462 "Retrying activity with `{trap_kind}` execution after {duration:?} at {expires_at}"
463 );
464 (
465 ExecutionEventInner::TemporarilyFailed {
466 reason: StrVariant::from(reason_generic),
467 backoff_expires_at: expires_at,
468 detail,
469 http_client_traces,
470 },
471 None,
472 version,
473 )
474 } else {
475 info!(
476 "Activity with `{trap_kind}` marked as permanent failure - {reason_generic}"
477 );
478 let result = SupportedFunctionReturnValue::ExecutionError(
479 FinishedExecutionError {
480 reason: Some(reason_generic),
481 kind: ExecutionFailureKind::Uncategorized,
482 detail,
483 },
484 );
485 let child_finished =
486 parent.map(|(parent_execution_id, parent_join_set)| {
487 ChildFinishedResponse {
488 parent_execution_id,
489 parent_join_set,
490 result: result.clone(),
491 }
492 });
493 (
494 ExecutionEventInner::Finished {
495 result,
496 http_client_traces,
497 },
498 child_finished,
499 version,
500 )
501 }
502 }
503 WorkerError::ActivityPreopenedDirError {
504 reason,
505 detail,
506 version,
507 } => {
508 let http_client_traces = None;
509 if let Some(duration) = can_be_retried {
510 let expires_at = result_obtained_at + duration;
511 debug!(
512 "Retrying activity with ActivityPreopenedDirError `{reason}` execution after {duration:?} at {expires_at}"
513 );
514 (
515 ExecutionEventInner::TemporarilyFailed {
516 reason: StrVariant::from(reason_generic),
517 backoff_expires_at: expires_at,
518 detail: Some(detail),
519 http_client_traces,
520 },
521 None,
522 version,
523 )
524 } else {
525 info!(
526 "Activity with ActivityPreopenedDirError `{reason}` marked as permanent failure - {reason_generic}"
527 );
528 let result = SupportedFunctionReturnValue::ExecutionError(
529 FinishedExecutionError {
530 reason: Some(reason_generic),
531 kind: ExecutionFailureKind::Uncategorized,
532 detail: Some(detail),
533 },
534 );
535 let child_finished =
536 parent.map(|(parent_execution_id, parent_join_set)| {
537 ChildFinishedResponse {
538 parent_execution_id,
539 parent_join_set,
540 result: result.clone(),
541 }
542 });
543 (
544 ExecutionEventInner::Finished {
545 result,
546 http_client_traces,
547 },
548 child_finished,
549 version,
550 )
551 }
552 }
553 WorkerError::ActivityReturnedError {
554 detail,
555 version,
556 http_client_traces,
557 } => {
558 let duration = can_be_retried.expect(
559 "ActivityReturnedError must not be returned when retries are exhausted",
560 );
561 let expires_at = result_obtained_at + duration;
562 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
563 (
564 ExecutionEventInner::TemporarilyFailed {
565 backoff_expires_at: expires_at,
566 reason: StrVariant::Static("activity returned error"), detail, http_client_traces,
569 },
570 None,
571 version,
572 )
573 }
574 WorkerError::LimitReached {
575 reason,
576 version: new_version,
577 } => {
578 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
579 warn!(
580 "Limit reached: {reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
581 );
582 (
583 ExecutionEventInner::Unlocked {
584 backoff_expires_at: expires_at,
585 reason: StrVariant::from(reason),
586 },
587 None,
588 new_version,
589 )
590 }
591 WorkerError::FatalError(fatal_error, version) => {
592 warn!("Fatal worker error - {fatal_error:?}");
593 let result = SupportedFunctionReturnValue::ExecutionError(
594 FinishedExecutionError::from(fatal_error),
595 );
596 let child_finished =
597 parent.map(|(parent_execution_id, parent_join_set)| {
598 ChildFinishedResponse {
599 parent_execution_id,
600 parent_join_set,
601 result: result.clone(),
602 }
603 });
604 (
605 ExecutionEventInner::Finished {
606 result,
607 http_client_traces: None,
608 },
609 child_finished,
610 version,
611 )
612 }
613 };
614 Some(Append {
615 created_at: result_obtained_at,
616 primary_event: AppendRequest {
617 created_at: result_obtained_at,
618 event: primary_event,
619 },
620 execution_id,
621 version,
622 child_finished,
623 })
624 }
625 })
626 }
627}
628
629#[derive(Debug, Clone)]
630pub(crate) struct ChildFinishedResponse {
631 pub(crate) parent_execution_id: ExecutionId,
632 pub(crate) parent_join_set: JoinSetId,
633 pub(crate) result: SupportedFunctionReturnValue,
634}
635
636#[derive(Debug, Clone)]
637pub(crate) struct Append {
638 pub(crate) created_at: DateTime<Utc>,
639 pub(crate) primary_event: AppendRequest,
640 pub(crate) execution_id: ExecutionId,
641 pub(crate) version: Version,
642 pub(crate) child_finished: Option<ChildFinishedResponse>,
643}
644
645impl Append {
646 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
647 if let Some(child_finished) = self.child_finished {
648 assert_matches!(
649 &self.primary_event,
650 AppendRequest {
651 event: ExecutionEventInner::Finished { .. },
652 ..
653 }
654 );
655 let child_execution_id = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
656 let events = AppendEventsToExecution {
657 execution_id: self.execution_id,
658 version: self.version.clone(),
659 batch: vec![self.primary_event],
660 };
661 let response = AppendResponseToExecution {
662 parent_execution_id: child_finished.parent_execution_id,
663 created_at: self.created_at,
664 join_set_id: child_finished.parent_join_set,
665 child_execution_id,
666 finished_version: self.version, result: child_finished.result,
668 };
669
670 db_exec
671 .append_batch_respond_to_parent(events, response, self.created_at)
672 .await?;
673 } else {
674 db_exec
675 .append(self.execution_id, self.version, self.primary_event)
676 .await?;
677 }
678 Ok(())
679 }
680}
681
682#[cfg(any(test, feature = "test"))]
683pub mod simple_worker {
684 use crate::worker::{Worker, WorkerContext, WorkerResult};
685 use async_trait::async_trait;
686 use concepts::{
687 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
688 storage::{HistoryEvent, Version},
689 };
690 use indexmap::IndexMap;
691 use std::sync::Arc;
692 use tracing::trace;
693
694 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
695 pub type SimpleWorkerResultMap =
696 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
697
698 #[derive(Clone, Debug)]
699 pub struct SimpleWorker {
700 pub worker_results_rev: SimpleWorkerResultMap,
701 pub ffqn: FunctionFqn,
702 exported: [FunctionMetadata; 1],
703 }
704
705 impl SimpleWorker {
706 #[must_use]
707 pub fn with_single_result(res: WorkerResult) -> Self {
708 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
709 Version::new(2),
710 (vec![], res),
711 )]))))
712 }
713
714 #[must_use]
715 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
716 Self {
717 worker_results_rev: self.worker_results_rev,
718 exported: [FunctionMetadata {
719 ffqn: ffqn.clone(),
720 parameter_types: ParameterTypes::default(),
721 return_type: RETURN_TYPE_DUMMY,
722 extension: None,
723 submittable: true,
724 }],
725 ffqn,
726 }
727 }
728
729 #[must_use]
730 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
731 Self {
732 worker_results_rev,
733 ffqn: FFQN_SOME,
734 exported: [FunctionMetadata {
735 ffqn: FFQN_SOME,
736 parameter_types: ParameterTypes::default(),
737 return_type: RETURN_TYPE_DUMMY,
738 extension: None,
739 submittable: true,
740 }],
741 }
742 }
743 }
744
745 #[async_trait]
746 impl Worker for SimpleWorker {
747 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
748 let (expected_version, (expected_eh, worker_result)) =
749 self.worker_results_rev.lock().unwrap().pop().unwrap();
750 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
751 assert_eq!(expected_version, ctx.version);
752 assert_eq!(
753 expected_eh,
754 ctx.event_history
755 .iter()
756 .map(|(event, _version)| event.clone())
757 .collect::<Vec<_>>()
758 );
759 worker_result
760 }
761
762 fn exported_functions(&self) -> &[FunctionMetadata] {
763 &self.exported
764 }
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use self::simple_worker::SimpleWorker;
771 use super::*;
772 use crate::{expired_timers_watcher, worker::WorkerResult};
773 use assert_matches::assert_matches;
774 use async_trait::async_trait;
775 use concepts::storage::{
776 CreateRequest, DbConnection, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
777 };
778 use concepts::storage::{DbPoolCloseable, LockedBy};
779 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
780 use concepts::time::Now;
781 use concepts::{
782 FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
783 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
784 };
785 use db_tests::Database;
786 use indexmap::IndexMap;
787 use simple_worker::FFQN_SOME;
788 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
789 use test_utils::set_up;
790 use test_utils::sim_clock::{ConstClock, SimClock};
791
792 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
793
794 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
795 config: ExecConfig,
796 clock_fn: C,
797 db_exec: Arc<dyn DbExecutor>,
798 worker: Arc<W>,
799 executed_at: DateTime<Utc>,
800 ) -> Vec<ExecutionId> {
801 trace!("Ticking with {worker:?}");
802 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
803 let executor = ExecTask::new_test(worker, config, clock_fn, db_exec, ffqns);
804 executor
805 .tick_test_await(executed_at, RunId::generate())
806 .await
807 }
808
809 #[tokio::test]
810 async fn execute_simple_lifecycle_tick_based_mem() {
811 let created_at = Now.now();
812 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
813 execute_simple_lifecycle_tick_based(
814 db_pool.connection().as_ref(),
815 db_exec.clone(),
816 ConstClock(created_at),
817 )
818 .await;
819 db_close.close().await;
820 }
821
822 #[tokio::test]
823 async fn execute_simple_lifecycle_tick_based_sqlite() {
824 let created_at = Now.now();
825 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
826 execute_simple_lifecycle_tick_based(
827 db_pool.connection().as_ref(),
828 db_exec.clone(),
829 ConstClock(created_at),
830 )
831 .await;
832 db_close.close().await;
833 }
834
835 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
836 db_connection: &dyn DbConnection,
837 db_exec: Arc<dyn DbExecutor>,
838 clock_fn: C,
839 ) {
840 set_up();
841 let created_at = clock_fn.now();
842 let exec_config = ExecConfig {
843 batch_size: 1,
844 lock_expiry: Duration::from_secs(1),
845 tick_sleep: Duration::from_millis(100),
846 component_id: ComponentId::dummy_activity(),
847 task_limiter: None,
848 executor_id: ExecutorId::generate(),
849 retry_config: ComponentRetryConfig::ZERO,
850 };
851
852 let execution_log = create_and_tick(
853 CreateAndTickConfig {
854 execution_id: ExecutionId::generate(),
855 created_at,
856 executed_at: created_at,
857 },
858 clock_fn,
859 db_connection,
860 db_exec,
861 exec_config,
862 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
863 SUPPORTED_RETURN_VALUE_OK_EMPTY,
864 Version::new(2),
865 None,
866 ))),
867 tick_fn,
868 )
869 .await;
870 assert_matches!(
871 execution_log.events.get(2).unwrap(),
872 ExecutionEvent {
873 event: ExecutionEventInner::Finished {
874 result: SupportedFunctionReturnValue::Ok { ok: None },
875 http_client_traces: None
876 },
877 created_at: _,
878 backtrace_id: None,
879 version: Version(2),
880 }
881 );
882 }
883
884 #[tokio::test]
885 async fn execute_simple_lifecycle_task_based_mem() {
886 set_up();
887 let created_at = Now.now();
888 let clock_fn = ConstClock(created_at);
889 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
890 let exec_config = ExecConfig {
891 batch_size: 1,
892 lock_expiry: Duration::from_secs(1),
893 tick_sleep: Duration::ZERO,
894 component_id: ComponentId::dummy_activity(),
895 task_limiter: None,
896 executor_id: ExecutorId::generate(),
897 retry_config: ComponentRetryConfig::ZERO,
898 };
899
900 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
901 SUPPORTED_RETURN_VALUE_OK_EMPTY,
902 Version::new(2),
903 None,
904 )));
905
906 let execution_log = create_and_tick(
907 CreateAndTickConfig {
908 execution_id: ExecutionId::generate(),
909 created_at,
910 executed_at: created_at,
911 },
912 clock_fn,
913 db_pool.connection().as_ref(),
914 db_exec,
915 exec_config,
916 worker,
917 tick_fn,
918 )
919 .await;
920 assert_matches!(
921 execution_log.events.get(2).unwrap(),
922 ExecutionEvent {
923 event: ExecutionEventInner::Finished {
924 result: SupportedFunctionReturnValue::Ok { ok: None },
925 http_client_traces: None
926 },
927 created_at: _,
928 backtrace_id: None,
929 version: Version(2),
930 }
931 );
932 db_close.close().await;
933 }
934
935 struct CreateAndTickConfig {
936 execution_id: ExecutionId,
937 created_at: DateTime<Utc>,
938 executed_at: DateTime<Utc>,
939 }
940
941 async fn create_and_tick<
942 W: Worker,
943 C: ClockFn,
944 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
945 F: Future<Output = Vec<ExecutionId>>,
946 >(
947 config: CreateAndTickConfig,
948 clock_fn: C,
949 db_connection: &dyn DbConnection,
950 db_exec: Arc<dyn DbExecutor>,
951 exec_config: ExecConfig,
952 worker: Arc<W>,
953 mut tick: T,
954 ) -> ExecutionLog {
955 db_connection
957 .create(CreateRequest {
958 created_at: config.created_at,
959 execution_id: config.execution_id.clone(),
960 ffqn: FFQN_SOME,
961 params: Params::empty(),
962 parent: None,
963 metadata: concepts::ExecutionMetadata::empty(),
964 scheduled_at: config.created_at,
965 component_id: ComponentId::dummy_activity(),
966 scheduled_by: None,
967 })
968 .await
969 .unwrap();
970 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
972 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
973 debug!("Execution history after tick: {execution_log:?}");
974 assert_matches!(
976 execution_log.events.first().unwrap(),
977 ExecutionEvent {
978 event: ExecutionEventInner::Created { .. },
979 created_at: actually_created_at,
980 backtrace_id: None,
981 version: Version(0),
982 }
983 if config.created_at == *actually_created_at
984 );
985 let locked_at = assert_matches!(
986 execution_log.events.get(1).unwrap(),
987 ExecutionEvent {
988 event: ExecutionEventInner::Locked { .. },
989 created_at: locked_at,
990 backtrace_id: None,
991 version: Version(1),
992 } if config.created_at <= *locked_at
993 => *locked_at
994 );
995 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
996 event: _,
997 created_at: executed_at,
998 backtrace_id: None,
999 version: Version(2),
1000 } if *executed_at >= locked_at);
1001 execution_log
1002 }
1003
1004 #[tokio::test]
1005 async fn activity_trap_should_trigger_an_execution_retry() {
1006 set_up();
1007 let sim_clock = SimClock::default();
1008 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1009 let retry_exp_backoff = Duration::from_millis(100);
1010 let retry_config = ComponentRetryConfig {
1011 max_retries: 1,
1012 retry_exp_backoff,
1013 };
1014 let exec_config = ExecConfig {
1015 batch_size: 1,
1016 lock_expiry: Duration::from_secs(1),
1017 tick_sleep: Duration::ZERO,
1018 component_id: ComponentId::dummy_activity(),
1019 task_limiter: None,
1020 executor_id: ExecutorId::generate(),
1021 retry_config,
1022 };
1023 let expected_reason = "error reason";
1024 let expected_detail = "error detail";
1025 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1026 WorkerError::ActivityTrap {
1027 reason: expected_reason.to_string(),
1028 trap_kind: concepts::TrapKind::Trap,
1029 detail: Some(expected_detail.to_string()),
1030 version: Version::new(2),
1031 http_client_traces: None,
1032 },
1033 )));
1034 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1035 let execution_log = create_and_tick(
1036 CreateAndTickConfig {
1037 execution_id: ExecutionId::generate(),
1038 created_at: sim_clock.now(),
1039 executed_at: sim_clock.now(),
1040 },
1041 sim_clock.clone(),
1042 db_pool.connection().as_ref(),
1043 db_exec.clone(),
1044 exec_config.clone(),
1045 worker,
1046 tick_fn,
1047 )
1048 .await;
1049 assert_eq!(3, execution_log.events.len());
1050 {
1051 let (reason, detail, at, expires_at) = assert_matches!(
1052 &execution_log.events.get(2).unwrap(),
1053 ExecutionEvent {
1054 event: ExecutionEventInner::TemporarilyFailed {
1055 reason,
1056 detail,
1057 backoff_expires_at,
1058 http_client_traces: None,
1059 },
1060 created_at: at,
1061 backtrace_id: None,
1062 version: Version(2),
1063 }
1064 => (reason, detail, *at, *backoff_expires_at)
1065 );
1066 assert_eq!(format!("activity trap: {expected_reason}"), reason.deref());
1067 assert_eq!(Some(expected_detail), detail.as_deref());
1068 assert_eq!(at, sim_clock.now());
1069 assert_eq!(sim_clock.now() + retry_config.retry_exp_backoff, expires_at);
1070 }
1071 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1072 std::sync::Mutex::new(IndexMap::from([(
1073 Version::new(4),
1074 (
1075 vec![],
1076 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1077 ),
1078 )])),
1079 )));
1080 assert!(
1082 tick_fn(
1083 exec_config.clone(),
1084 sim_clock.clone(),
1085 db_exec.clone(),
1086 worker.clone(),
1087 sim_clock.now(),
1088 )
1089 .await
1090 .is_empty()
1091 );
1092 sim_clock.move_time_forward(retry_config.retry_exp_backoff);
1094 tick_fn(
1095 exec_config,
1096 sim_clock.clone(),
1097 db_exec.clone(),
1098 worker,
1099 sim_clock.now(),
1100 )
1101 .await;
1102 let execution_log = {
1103 let db_connection = db_pool.connection();
1104 db_connection
1105 .get(&execution_log.execution_id)
1106 .await
1107 .unwrap()
1108 };
1109 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1110 assert_matches!(
1111 execution_log.events.get(3).unwrap(),
1112 ExecutionEvent {
1113 event: ExecutionEventInner::Locked { .. },
1114 created_at: at,
1115 backtrace_id: None,
1116 version: Version(3),
1117 } if *at == sim_clock.now()
1118 );
1119 assert_matches!(
1120 execution_log.events.get(4).unwrap(),
1121 ExecutionEvent {
1122 event: ExecutionEventInner::Finished {
1123 result: SupportedFunctionReturnValue::Ok{ok:None},
1124 http_client_traces: None
1125 },
1126 created_at: finished_at,
1127 backtrace_id: None,
1128 version: Version(4),
1129 } if *finished_at == sim_clock.now()
1130 );
1131 db_close.close().await;
1132 }
1133
1134 #[tokio::test]
1135 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1136 set_up();
1137 let created_at = Now.now();
1138 let clock_fn = ConstClock(created_at);
1139 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1140 let exec_config = ExecConfig {
1141 batch_size: 1,
1142 lock_expiry: Duration::from_secs(1),
1143 tick_sleep: Duration::ZERO,
1144 component_id: ComponentId::dummy_activity(),
1145 task_limiter: None,
1146 executor_id: ExecutorId::generate(),
1147 retry_config: ComponentRetryConfig::ZERO,
1148 };
1149
1150 let reason = "error reason";
1151 let expected_reason = format!("activity trap: {reason}");
1152 let expected_detail = "error detail";
1153 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1154 WorkerError::ActivityTrap {
1155 reason: reason.to_string(),
1156 trap_kind: concepts::TrapKind::Trap,
1157 detail: Some(expected_detail.to_string()),
1158 version: Version::new(2),
1159 http_client_traces: None,
1160 },
1161 )));
1162 let execution_log = create_and_tick(
1163 CreateAndTickConfig {
1164 execution_id: ExecutionId::generate(),
1165 created_at,
1166 executed_at: created_at,
1167 },
1168 clock_fn,
1169 db_pool.connection().as_ref(),
1170 db_exec,
1171 exec_config.clone(),
1172 worker,
1173 tick_fn,
1174 )
1175 .await;
1176 assert_eq!(3, execution_log.events.len());
1177 let (reason, kind, detail) = assert_matches!(
1178 &execution_log.events.get(2).unwrap(),
1179 ExecutionEvent {
1180 event: ExecutionEventInner::Finished{
1181 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError{reason, kind, detail}),
1182 http_client_traces: None
1183 },
1184 created_at: at,
1185 backtrace_id: None,
1186 version: Version(2),
1187 } if *at == created_at
1188 => (reason, kind, detail)
1189 );
1190
1191 assert_eq!(Some(expected_reason), *reason);
1192 assert_eq!(Some(expected_detail), detail.as_deref());
1193 assert_eq!(ExecutionFailureKind::Uncategorized, *kind);
1194
1195 db_close.close().await;
1196 }
1197
1198 #[tokio::test]
1199 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1200 let worker_error = WorkerError::ActivityTrap {
1201 reason: "error reason".to_string(),
1202 trap_kind: TrapKind::Trap,
1203 detail: Some("detail".to_string()),
1204 version: Version::new(2),
1205 http_client_traces: None,
1206 };
1207 let expected_child_err = FinishedExecutionError {
1208 kind: ExecutionFailureKind::Uncategorized,
1209 reason: Some("activity trap: error reason".to_string()),
1210 detail: Some("detail".to_string()),
1211 };
1212 child_execution_permanently_failed_should_notify_parent(
1213 WorkerResult::Err(worker_error),
1214 expected_child_err,
1215 )
1216 .await;
1217 }
1218
1219 #[tokio::test]
1220 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1221 let expected_child_err = FinishedExecutionError {
1222 kind: ExecutionFailureKind::TimedOut,
1223 reason: None,
1224 detail: None,
1225 };
1226 child_execution_permanently_failed_should_notify_parent(
1227 WorkerResult::DbUpdatedByWorkerOrWatcher,
1228 expected_child_err,
1229 )
1230 .await;
1231 }
1232
1233 async fn child_execution_permanently_failed_should_notify_parent(
1234 worker_result: WorkerResult,
1235 expected_child_err: FinishedExecutionError,
1236 ) {
1237 use concepts::storage::JoinSetResponseEventOuter;
1238 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1239
1240 set_up();
1241 let sim_clock = SimClock::default();
1242 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1243
1244 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1245 WorkerResult::DbUpdatedByWorkerOrWatcher,
1246 ));
1247 let parent_execution_id = ExecutionId::generate();
1248 db_pool
1249 .connection()
1250 .create(CreateRequest {
1251 created_at: sim_clock.now(),
1252 execution_id: parent_execution_id.clone(),
1253 ffqn: FFQN_SOME,
1254 params: Params::empty(),
1255 parent: None,
1256 metadata: concepts::ExecutionMetadata::empty(),
1257 scheduled_at: sim_clock.now(),
1258 component_id: ComponentId::dummy_activity(),
1259 scheduled_by: None,
1260 })
1261 .await
1262 .unwrap();
1263 let parent_executor_id = ExecutorId::generate();
1264 tick_fn(
1265 ExecConfig {
1266 batch_size: 1,
1267 lock_expiry: LOCK_EXPIRY,
1268 tick_sleep: Duration::ZERO,
1269 component_id: ComponentId::dummy_activity(),
1270 task_limiter: None,
1271 executor_id: parent_executor_id,
1272 retry_config: ComponentRetryConfig::ZERO,
1273 },
1274 sim_clock.clone(),
1275 db_exec.clone(),
1276 parent_worker,
1277 sim_clock.now(),
1278 )
1279 .await;
1280
1281 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1282 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1283 {
1285 let params = Params::empty();
1286 let child = CreateRequest {
1287 created_at: sim_clock.now(),
1288 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1289 ffqn: FFQN_CHILD,
1290 params: params.clone(),
1291 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1292 metadata: concepts::ExecutionMetadata::empty(),
1293 scheduled_at: sim_clock.now(),
1294 component_id: ComponentId::dummy_activity(),
1295 scheduled_by: None,
1296 };
1297 let current_time = sim_clock.now();
1298 let join_set = AppendRequest {
1299 created_at: current_time,
1300 event: ExecutionEventInner::HistoryEvent {
1301 event: HistoryEvent::JoinSetCreate {
1302 join_set_id: join_set_id.clone(),
1303 },
1304 },
1305 };
1306 let child_exec_req = AppendRequest {
1307 created_at: current_time,
1308 event: ExecutionEventInner::HistoryEvent {
1309 event: HistoryEvent::JoinSetRequest {
1310 join_set_id: join_set_id.clone(),
1311 request: JoinSetRequest::ChildExecutionRequest {
1312 child_execution_id: child_execution_id.clone(),
1313 target_ffqn: FFQN_CHILD,
1314 params,
1315 },
1316 },
1317 },
1318 };
1319 let join_next = AppendRequest {
1320 created_at: current_time,
1321 event: ExecutionEventInner::HistoryEvent {
1322 event: HistoryEvent::JoinNext {
1323 join_set_id: join_set_id.clone(),
1324 run_expires_at: sim_clock.now(),
1325 closing: false,
1326 requested_ffqn: Some(FFQN_CHILD),
1327 },
1328 },
1329 };
1330 db_pool
1331 .connection()
1332 .append_batch_create_new_execution(
1333 current_time,
1334 vec![join_set, child_exec_req, join_next],
1335 parent_execution_id.clone(),
1336 Version::new(2),
1337 vec![child],
1338 )
1339 .await
1340 .unwrap();
1341 }
1342
1343 let child_worker =
1344 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1345
1346 tick_fn(
1348 ExecConfig {
1349 batch_size: 1,
1350 lock_expiry: LOCK_EXPIRY,
1351 tick_sleep: Duration::ZERO,
1352 component_id: ComponentId::dummy_activity(),
1353 task_limiter: None,
1354 executor_id: ExecutorId::generate(),
1355 retry_config: ComponentRetryConfig::ZERO,
1356 },
1357 sim_clock.clone(),
1358 db_exec.clone(),
1359 child_worker,
1360 sim_clock.now(),
1361 )
1362 .await;
1363 if matches!(expected_child_err.kind, ExecutionFailureKind::TimedOut) {
1364 sim_clock.move_time_forward(LOCK_EXPIRY);
1366 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1367 .await
1368 .unwrap();
1369 }
1370 let child_log = db_pool
1371 .connection()
1372 .get(&ExecutionId::Derived(child_execution_id.clone()))
1373 .await
1374 .unwrap();
1375 assert!(child_log.pending_state.is_finished());
1376 assert_eq!(
1377 Version(2),
1378 child_log.next_version,
1379 "created = 0, locked = 1, with_single_result = 2"
1380 );
1381 assert_eq!(
1382 ExecutionEventInner::Finished {
1383 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1384 http_client_traces: None
1385 },
1386 child_log.last_event().event
1387 );
1388 let parent_log = db_pool
1389 .connection()
1390 .get(&parent_execution_id)
1391 .await
1392 .unwrap();
1393 assert_matches!(
1394 parent_log.pending_state,
1395 PendingState::PendingAt {
1396 scheduled_at,
1397 last_lock: Some(LockedBy { executor_id: found_executor_id, run_id: _}),
1398 } if scheduled_at == sim_clock.now() && found_executor_id == parent_executor_id,
1399 "parent should be back to pending"
1400 );
1401 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1402 parent_log.responses.last(),
1403 Some(JoinSetResponseEventOuter{
1404 created_at: at,
1405 event: JoinSetResponseEvent{
1406 join_set_id: found_join_set_id,
1407 event: JoinSetResponse::ChildExecutionFinished {
1408 child_execution_id: found_child_execution_id,
1409 finished_version,
1410 result: found_result,
1411 }
1412 }
1413 })
1414 if *at == sim_clock.now()
1415 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1416 );
1417 assert_eq!(join_set_id, *found_join_set_id);
1418 assert_eq!(child_execution_id, *found_child_execution_id);
1419 assert_eq!(child_log.next_version, *child_finished_version);
1420 assert_matches!(
1421 found_result,
1422 SupportedFunctionReturnValue::ExecutionError(_)
1423 );
1424
1425 db_close.close().await;
1426 }
1427
1428 #[derive(Clone, Debug)]
1429 struct SleepyWorker {
1430 duration: Duration,
1431 result: SupportedFunctionReturnValue,
1432 exported: [FunctionMetadata; 1],
1433 }
1434
1435 #[async_trait]
1436 impl Worker for SleepyWorker {
1437 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1438 tokio::time::sleep(self.duration).await;
1439 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1440 }
1441
1442 fn exported_functions(&self) -> &[FunctionMetadata] {
1443 &self.exported
1444 }
1445 }
1446
1447 #[tokio::test]
1448 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1449 set_up();
1450 let sim_clock = SimClock::default();
1451 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1452 let lock_expiry = Duration::from_millis(100);
1453 let timeout_duration = Duration::from_millis(300);
1454 let retry_config = ComponentRetryConfig {
1455 max_retries: 1,
1456 retry_exp_backoff: timeout_duration,
1457 };
1458 let exec_config = ExecConfig {
1459 batch_size: 1,
1460 lock_expiry,
1461 tick_sleep: Duration::ZERO,
1462 component_id: ComponentId::dummy_activity(),
1463 task_limiter: None,
1464 executor_id: ExecutorId::generate(),
1465 retry_config,
1466 };
1467
1468 let worker = Arc::new(SleepyWorker {
1469 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1471 exported: [FunctionMetadata {
1472 ffqn: FFQN_SOME,
1473 parameter_types: ParameterTypes::default(),
1474 return_type: RETURN_TYPE_DUMMY,
1475 extension: None,
1476 submittable: true,
1477 }],
1478 });
1479 let execution_id = ExecutionId::generate();
1481 let db_connection = db_pool.connection();
1482 db_connection
1483 .create(CreateRequest {
1484 created_at: sim_clock.now(),
1485 execution_id: execution_id.clone(),
1486 ffqn: FFQN_SOME,
1487 params: Params::empty(),
1488 parent: None,
1489 metadata: concepts::ExecutionMetadata::empty(),
1490 scheduled_at: sim_clock.now(),
1491 component_id: ComponentId::dummy_activity(),
1492 scheduled_by: None,
1493 })
1494 .await
1495 .unwrap();
1496
1497 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1498 let executor = ExecTask::new_test(
1499 worker,
1500 exec_config.clone(),
1501 sim_clock.clone(),
1502 db_exec.clone(),
1503 ffqns,
1504 );
1505 let mut first_execution_progress = executor
1506 .tick(sim_clock.now(), RunId::generate())
1507 .await
1508 .unwrap();
1509 assert_eq!(1, first_execution_progress.executions.len());
1510 sim_clock.move_time_forward(lock_expiry);
1512 let now_after_first_lock_expiry = sim_clock.now();
1514 {
1515 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1516 let cleanup_progress = executor
1517 .tick(now_after_first_lock_expiry, RunId::generate())
1518 .await
1519 .unwrap();
1520 assert!(cleanup_progress.executions.is_empty());
1521 }
1522 {
1523 let expired_locks = expired_timers_watcher::tick(
1524 db_pool.connection().as_ref(),
1525 now_after_first_lock_expiry,
1526 )
1527 .await
1528 .unwrap()
1529 .expired_locks;
1530 assert_eq!(1, expired_locks);
1531 }
1532 assert!(
1533 !first_execution_progress
1534 .executions
1535 .pop()
1536 .unwrap()
1537 .1
1538 .is_finished()
1539 );
1540
1541 let execution_log = db_connection.get(&execution_id).await.unwrap();
1542 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1543 assert_matches!(
1544 &execution_log.events.get(2).unwrap(),
1545 ExecutionEvent {
1546 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1547 created_at: at,
1548 backtrace_id: None,
1549 version: Version(2),
1550 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1551 );
1552 assert_matches!(
1553 execution_log.pending_state,
1554 PendingState::PendingAt {
1555 scheduled_at: found_scheduled_by,
1556 last_lock: Some(LockedBy {
1557 executor_id: found_executor_id,
1558 run_id: _,
1559 }),
1560 } if found_scheduled_by == expected_first_timeout_expiry && found_executor_id == exec_config.executor_id
1561 );
1562 sim_clock.move_time_forward(timeout_duration);
1563 let now_after_first_timeout = sim_clock.now();
1564 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1565
1566 let mut second_execution_progress = executor
1567 .tick(now_after_first_timeout, RunId::generate())
1568 .await
1569 .unwrap();
1570 assert_eq!(1, second_execution_progress.executions.len());
1571
1572 sim_clock.move_time_forward(lock_expiry);
1574 let now_after_second_lock_expiry = sim_clock.now();
1576 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1577 {
1578 let cleanup_progress = executor
1579 .tick(now_after_second_lock_expiry, RunId::generate())
1580 .await
1581 .unwrap();
1582 assert!(cleanup_progress.executions.is_empty());
1583 }
1584 {
1585 let expired_locks = expired_timers_watcher::tick(
1586 db_pool.connection().as_ref(),
1587 now_after_second_lock_expiry,
1588 )
1589 .await
1590 .unwrap()
1591 .expired_locks;
1592 assert_eq!(1, expired_locks);
1593 }
1594 assert!(
1595 !second_execution_progress
1596 .executions
1597 .pop()
1598 .unwrap()
1599 .1
1600 .is_finished()
1601 );
1602
1603 drop(db_connection);
1604 drop(executor);
1605 db_close.close().await;
1606 }
1607}