1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::prefixed_ulid::RunId;
5use concepts::storage::{
6 AppendEventsToExecution, AppendRequest, AppendResponseToExecution, DbErrorGeneric,
7 DbErrorWrite, DbExecutor, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
8 LockedExecution,
9};
10use concepts::time::{ClockFn, Sleep};
11use concepts::{ComponentId, FunctionMetadata, StrVariant, SupportedFunctionReturnValue};
12use concepts::{ExecutionId, FunctionFqn, prefixed_ulid::ExecutorId};
13use concepts::{
14 FinishedExecutionError,
15 storage::{ExecutionEventInner, JoinSetResponse, Version},
16};
17use concepts::{JoinSetId, PermanentFailureKind};
18use std::fmt::Display;
19use std::{
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26use tokio::task::{AbortHandle, JoinHandle};
27use tracing::{Instrument, Level, Span, debug, error, info, info_span, instrument, trace, warn};
28
29#[derive(Debug, Clone)]
30pub struct ExecConfig {
31 pub lock_expiry: Duration,
32 pub tick_sleep: Duration,
33 pub batch_size: u32,
34 pub component_id: ComponentId,
35 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
36 pub executor_id: ExecutorId,
37}
38
39pub struct ExecTask<C: ClockFn> {
40 worker: Arc<dyn Worker>,
41 config: ExecConfig,
42 clock_fn: C, db_exec: Arc<dyn DbExecutor>,
44 ffqns: Arc<[FunctionFqn]>,
45}
46
47#[derive(derive_more::Debug, Default)]
48pub struct ExecutionProgress {
49 #[debug(skip)]
50 #[allow(dead_code)]
51 executions: Vec<(ExecutionId, JoinHandle<()>)>,
52}
53
54impl ExecutionProgress {
55 #[cfg(feature = "test")]
56 pub async fn wait_for_tasks(self) -> Vec<ExecutionId> {
57 let mut vec = Vec::new();
58 for (exe, join_handle) in self.executions {
59 vec.push(exe);
60 join_handle.await.unwrap();
61 }
62 vec
63 }
64}
65
66#[derive(derive_more::Debug)]
67pub struct ExecutorTaskHandle {
68 #[debug(skip)]
69 is_closing: Arc<AtomicBool>,
70 #[debug(skip)]
71 abort_handle: AbortHandle,
72 component_id: ComponentId,
73 executor_id: ExecutorId,
74}
75
76impl ExecutorTaskHandle {
77 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
78 pub async fn close(&self) {
79 trace!("Gracefully closing");
80 self.is_closing.store(true, Ordering::Relaxed);
81 while !self.abort_handle.is_finished() {
82 tokio::time::sleep(Duration::from_millis(1)).await;
83 }
84 debug!("Gracefully closed");
85 }
86}
87
88impl Drop for ExecutorTaskHandle {
89 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id = %self.executor_id, component_id = %self.component_id))]
90 fn drop(&mut self) {
91 if self.abort_handle.is_finished() {
92 return;
93 }
94 warn!("Aborting the executor task");
95 self.abort_handle.abort();
96 }
97}
98
99#[cfg(feature = "test")]
100pub fn extract_exported_ffqns_noext_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
101 extract_exported_ffqns_noext(worker)
102}
103
104fn extract_exported_ffqns_noext(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
105 worker
106 .exported_functions()
107 .iter()
108 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
109 .collect::<Arc<_>>()
110}
111
112impl<C: ClockFn + 'static> ExecTask<C> {
113 #[cfg(feature = "test")]
114 pub fn new(
115 worker: Arc<dyn Worker>,
116 config: ExecConfig,
117 clock_fn: C,
118 db_exec: Arc<dyn DbExecutor>,
119 ffqns: Arc<[FunctionFqn]>,
120 ) -> Self {
121 Self {
122 worker,
123 config,
124 clock_fn,
125 db_exec,
126 ffqns,
127 }
128 }
129
130 #[cfg(feature = "test")]
131 pub fn new_all_ffqns_test(
132 worker: Arc<dyn Worker>,
133 config: ExecConfig,
134 clock_fn: C,
135 db_exec: Arc<dyn DbExecutor>,
136 ) -> Self {
137 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
138 Self {
139 worker,
140 config,
141 clock_fn,
142 db_exec,
143 ffqns,
144 }
145 }
146
147 pub fn spawn_new(
148 worker: Arc<dyn Worker>,
149 config: ExecConfig,
150 clock_fn: C,
151 db_exec: Arc<dyn DbExecutor>,
152 sleep: impl Sleep + 'static,
153 ) -> ExecutorTaskHandle {
154 let is_closing = Arc::new(AtomicBool::default());
155 let is_closing_inner = is_closing.clone();
156 let ffqns = extract_exported_ffqns_noext(worker.as_ref());
157 let component_id = config.component_id.clone();
158 let executor_id = config.executor_id;
159 let abort_handle = tokio::spawn(async move {
160 debug!(executor_id = %config.executor_id, component_id = %config.component_id, "Spawned executor");
161 let task = ExecTask {
162 worker,
163 config,
164 db_exec,
165 ffqns: ffqns.clone(),
166 clock_fn: clock_fn.clone(),
167 };
168 while !is_closing_inner.load(Ordering::Relaxed) {
169 let _ = task.tick(clock_fn.now(), RunId::generate()).await;
170
171 task.db_exec
172 .wait_for_pending(clock_fn.now(), ffqns.clone(), {
173 let sleep = sleep.clone();
174 Box::pin(async move { sleep.sleep(task.config.tick_sleep).await })})
175 .await;
176 }
177 })
178 .abort_handle();
179 ExecutorTaskHandle {
180 is_closing,
181 abort_handle,
182 component_id,
183 executor_id,
184 }
185 }
186
187 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
188 if let Some(task_limiter) = &self.config.task_limiter {
189 let mut locks = Vec::new();
190 for _ in 0..self.config.batch_size {
191 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
192 locks.push(Some(permit));
193 } else {
194 break;
195 }
196 }
197 locks
198 } else {
199 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
200 for _ in 0..self.config.batch_size {
201 vec.push(None);
202 }
203 vec
204 }
205 }
206
207 #[cfg(feature = "test")]
208 pub async fn tick_test(&self, executed_at: DateTime<Utc>, run_id: RunId) -> ExecutionProgress {
209 self.tick(executed_at, run_id).await.unwrap()
210 }
211
212 #[cfg(feature = "test")]
213 pub async fn tick_test_await(
214 &self,
215 executed_at: DateTime<Utc>,
216 run_id: RunId,
217 ) -> Vec<ExecutionId> {
218 self.tick(executed_at, run_id)
219 .await
220 .unwrap()
221 .wait_for_tasks()
222 .await
223 }
224
225 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.config.executor_id, component_id = %self.config.component_id))]
226 async fn tick(
227 &self,
228 executed_at: DateTime<Utc>,
229 run_id: RunId,
230 ) -> Result<ExecutionProgress, DbErrorGeneric> {
231 let locked_executions = {
232 let mut permits = self.acquire_task_permits();
233 if permits.is_empty() {
234 return Ok(ExecutionProgress::default());
235 }
236 let lock_expires_at = executed_at + self.config.lock_expiry;
237 let locked_executions = self
238 .db_exec
239 .lock_pending(
240 permits.len(), executed_at, self.ffqns.clone(),
243 executed_at, self.config.component_id.clone(),
245 self.config.executor_id,
246 lock_expires_at,
247 run_id,
248 )
249 .await?;
250 while permits.len() > locked_executions.len() {
252 permits.pop();
253 }
254 assert_eq!(permits.len(), locked_executions.len());
255 locked_executions.into_iter().zip(permits)
256 };
257 let execution_deadline = executed_at + self.config.lock_expiry;
258
259 let mut executions = Vec::with_capacity(locked_executions.len());
260 for (locked_execution, permit) in locked_executions {
261 let execution_id = locked_execution.execution_id.clone();
262 let join_handle = {
263 let worker = self.worker.clone();
264 let db = self.db_exec.clone();
265 let clock_fn = self.clock_fn.clone();
266 let run_id = locked_execution.run_id;
267 let worker_span = info_span!(parent: None, "worker",
268 "otel.name" = format!("worker {}", locked_execution.ffqn),
269 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.config.executor_id, component_id = %self.config.component_id);
270 locked_execution.metadata.enrich(&worker_span);
271 tokio::spawn({
272 let worker_span2 = worker_span.clone();
273 async move {
274 let _permit = permit;
275 let res = Self::run_worker(
276 worker,
277 db.as_ref(),
278 execution_deadline,
279 clock_fn,
280 locked_execution,
281 worker_span2,
282 )
283 .await;
284 if let Err(db_error) = res {
285 error!("Got db error `{db_error:?}`, expecting watcher to mark execution as timed out");
286 }
287 }
288 .instrument(worker_span)
289 })
290 };
291 executions.push((execution_id, join_handle));
292 }
293 Ok(ExecutionProgress { executions })
294 }
295
296 async fn run_worker(
297 worker: Arc<dyn Worker>,
298 db_exec: &dyn DbExecutor,
299 execution_deadline: DateTime<Utc>,
300 clock_fn: C,
301 locked_execution: LockedExecution,
302 worker_span: Span,
303 ) -> Result<(), DbErrorWrite> {
304 debug!("Worker::run starting");
305 trace!(
306 version = %locked_execution.next_version,
307 params = ?locked_execution.params,
308 event_history = ?locked_execution.event_history,
309 "Worker::run starting"
310 );
311 let can_be_retried = ExecutionLog::can_be_retried_after(
312 locked_execution.intermittent_event_count + 1,
313 locked_execution.max_retries,
314 locked_execution.retry_exp_backoff,
315 );
316 let unlock_expiry_on_limit_reached =
317 ExecutionLog::compute_retry_duration_when_retrying_forever(
318 locked_execution.intermittent_event_count + 1,
319 locked_execution.retry_exp_backoff,
320 );
321 let ctx = WorkerContext {
322 execution_id: locked_execution.execution_id.clone(),
323 metadata: locked_execution.metadata,
324 ffqn: locked_execution.ffqn,
325 params: locked_execution.params,
326 event_history: locked_execution.event_history,
327 responses: locked_execution
328 .responses
329 .into_iter()
330 .map(|outer| outer.event)
331 .collect(),
332 version: locked_execution.next_version,
333 execution_deadline,
334 can_be_retried: can_be_retried.is_some(),
335 run_id: locked_execution.run_id,
336 worker_span,
337 };
338 let worker_result = worker.run(ctx).await;
339 trace!(?worker_result, "Worker::run finished");
340 let result_obtained_at = clock_fn.now();
341 match Self::worker_result_to_execution_event(
342 locked_execution.execution_id,
343 worker_result,
344 result_obtained_at,
345 locked_execution.parent,
346 can_be_retried,
347 unlock_expiry_on_limit_reached,
348 )? {
349 Some(append) => {
350 trace!("Appending {append:?}");
351 append.append(db_exec).await
352 }
353 None => Ok(()),
354 }
355 }
356
357 fn worker_result_to_execution_event(
359 execution_id: ExecutionId,
360 worker_result: WorkerResult,
361 result_obtained_at: DateTime<Utc>,
362 parent: Option<(ExecutionId, JoinSetId)>,
363 can_be_retried: Option<Duration>,
364 unlock_expiry_on_limit_reached: Duration,
365 ) -> Result<Option<Append>, DbErrorWrite> {
366 Ok(match worker_result {
367 WorkerResult::Ok(result, new_version, http_client_traces) => {
368 info!(
369 "Execution finished: {}",
370 result.as_pending_state_finished_result()
371 );
372 let child_finished =
373 parent.map(
374 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
375 parent_execution_id,
376 parent_join_set,
377 result: result.clone(),
378 },
379 );
380 let primary_event = AppendRequest {
381 created_at: result_obtained_at,
382 event: ExecutionEventInner::Finished {
383 result,
384 http_client_traces,
385 },
386 };
387
388 Some(Append {
389 created_at: result_obtained_at,
390 primary_event,
391 execution_id,
392 version: new_version,
393 child_finished,
394 })
395 }
396 WorkerResult::DbUpdatedByWorkerOrWatcher => None,
397 WorkerResult::Err(err) => {
398 let reason_full = err.to_string(); let (primary_event, child_finished, version) = match err {
400 WorkerError::TemporaryTimeout {
401 http_client_traces,
402 version,
403 } => {
404 if let Some(duration) = can_be_retried {
405 let backoff_expires_at = result_obtained_at + duration;
406 info!(
407 "Temporary timeout, retrying after {duration:?} at {backoff_expires_at}"
408 );
409 (
410 ExecutionEventInner::TemporarilyTimedOut {
411 backoff_expires_at,
412 http_client_traces,
413 },
414 None,
415 version,
416 )
417 } else {
418 info!("Permanent timeout");
419 let result = SupportedFunctionReturnValue::ExecutionError(
420 FinishedExecutionError::PermanentTimeout,
421 );
422 let child_finished =
423 parent.map(|(parent_execution_id, parent_join_set)| {
424 ChildFinishedResponse {
425 parent_execution_id,
426 parent_join_set,
427 result: result.clone(),
428 }
429 });
430 (
431 ExecutionEventInner::Finished {
432 result,
433 http_client_traces,
434 },
435 child_finished,
436 version,
437 )
438 }
439 }
440 WorkerError::DbError(db_error) => {
441 return Err(db_error);
442 }
443 WorkerError::ActivityTrap {
444 reason: reason_inner,
445 trap_kind,
446 detail,
447 version,
448 http_client_traces,
449 } => retry_or_fail(
450 result_obtained_at,
451 parent,
452 can_be_retried,
453 reason_full,
454 reason_inner,
455 trap_kind, detail,
457 version,
458 http_client_traces,
459 ),
460 WorkerError::ActivityPreopenedDirError {
461 reason_kind,
462 reason_inner,
463 version,
464 } => retry_or_fail(
465 result_obtained_at,
466 parent,
467 can_be_retried,
468 reason_full,
469 reason_inner,
470 reason_kind, None, version,
473 None, ),
475 WorkerError::ActivityReturnedError {
476 detail,
477 version,
478 http_client_traces,
479 } => {
480 let duration = can_be_retried.expect(
481 "ActivityReturnedError must not be returned when retries are exhausted",
482 );
483 let expires_at = result_obtained_at + duration;
484 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
485 (
486 ExecutionEventInner::TemporarilyFailed {
487 backoff_expires_at: expires_at,
488 reason_full: StrVariant::Static("activity returned error"), reason_inner: StrVariant::Static("activity returned error"),
490 detail, http_client_traces,
492 },
493 None,
494 version,
495 )
496 }
497 WorkerError::LimitReached {
498 reason: inner_reason,
499 version: new_version,
500 } => {
501 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
502 warn!(
503 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
504 );
505 (
506 ExecutionEventInner::Unlocked {
507 backoff_expires_at: expires_at,
508 reason: StrVariant::from(reason_full),
509 },
510 None,
511 new_version,
512 )
513 }
514 WorkerError::FatalError(fatal_error, version) => {
515 info!("Fatal worker error - {fatal_error:?}");
516 let result = SupportedFunctionReturnValue::ExecutionError(
517 FinishedExecutionError::from(fatal_error),
518 );
519 let child_finished =
520 parent.map(|(parent_execution_id, parent_join_set)| {
521 ChildFinishedResponse {
522 parent_execution_id,
523 parent_join_set,
524 result: result.clone(),
525 }
526 });
527 (
528 ExecutionEventInner::Finished {
529 result,
530 http_client_traces: None,
531 },
532 child_finished,
533 version,
534 )
535 }
536 };
537 Some(Append {
538 created_at: result_obtained_at,
539 primary_event: AppendRequest {
540 created_at: result_obtained_at,
541 event: primary_event,
542 },
543 execution_id,
544 version,
545 child_finished,
546 })
547 }
548 })
549 }
550}
551
552#[expect(clippy::too_many_arguments)]
553fn retry_or_fail(
554 result_obtained_at: DateTime<Utc>,
555 parent: Option<(ExecutionId, JoinSetId)>,
556 can_be_retried: Option<Duration>,
557 reason_full: String,
558 reason_inner: String,
559 err_kind_for_logs: impl Display,
560 detail: Option<String>,
561 version: Version,
562 http_client_traces: Option<Vec<concepts::storage::http_client_trace::HttpClientTrace>>,
563) -> (ExecutionEventInner, Option<ChildFinishedResponse>, Version) {
564 if let Some(duration) = can_be_retried {
565 let expires_at = result_obtained_at + duration;
566 debug!(
567 "Retrying activity with `{err_kind_for_logs}` execution after {duration:?} at {expires_at}"
568 );
569 (
570 ExecutionEventInner::TemporarilyFailed {
571 backoff_expires_at: expires_at,
572 reason_full: StrVariant::from(reason_full),
573 reason_inner: StrVariant::from(reason_inner),
574 detail,
575 http_client_traces,
576 },
577 None,
578 version,
579 )
580 } else {
581 info!("Activity with `{err_kind_for_logs}` marked as permanent failure - {reason_inner}");
582 let result = SupportedFunctionReturnValue::ExecutionError(
583 FinishedExecutionError::PermanentFailure {
584 reason_inner,
585 reason_full,
586 kind: PermanentFailureKind::ActivityTrap,
587 detail,
588 },
589 );
590
591 let child_finished =
592 parent.map(
593 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
594 parent_execution_id,
595 parent_join_set,
596 result: result.clone(),
597 },
598 );
599 (
600 ExecutionEventInner::Finished {
601 result,
602 http_client_traces,
603 },
604 child_finished,
605 version,
606 )
607 }
608}
609
610#[derive(Debug, Clone)]
611pub(crate) struct ChildFinishedResponse {
612 pub(crate) parent_execution_id: ExecutionId,
613 pub(crate) parent_join_set: JoinSetId,
614 pub(crate) result: SupportedFunctionReturnValue,
615}
616
617#[derive(Debug, Clone)]
618pub(crate) struct Append {
619 pub(crate) created_at: DateTime<Utc>,
620 pub(crate) primary_event: AppendRequest,
621 pub(crate) execution_id: ExecutionId,
622 pub(crate) version: Version,
623 pub(crate) child_finished: Option<ChildFinishedResponse>,
624}
625
626impl Append {
627 pub(crate) async fn append(self, db_exec: &dyn DbExecutor) -> Result<(), DbErrorWrite> {
628 if let Some(child_finished) = self.child_finished {
629 assert_matches!(
630 &self.primary_event,
631 AppendRequest {
632 event: ExecutionEventInner::Finished { .. },
633 ..
634 }
635 );
636 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
637 let events = AppendEventsToExecution {
638 execution_id: self.execution_id,
639 version: self.version.clone(),
640 batch: vec![self.primary_event],
641 };
642 let response = AppendResponseToExecution {
643 parent_execution_id: child_finished.parent_execution_id,
644 parent_response_event: JoinSetResponseEventOuter {
645 created_at: self.created_at,
646 event: JoinSetResponseEvent {
647 join_set_id: child_finished.parent_join_set,
648 event: JoinSetResponse::ChildExecutionFinished {
649 child_execution_id: derived,
650 finished_version: self.version,
652 result: child_finished.result,
653 },
654 },
655 },
656 };
657
658 db_exec
659 .append_batch_respond_to_parent(events, response, self.created_at)
660 .await?;
661 } else {
662 db_exec
663 .append(self.execution_id, self.version, self.primary_event)
664 .await?;
665 }
666 Ok(())
667 }
668}
669
670#[cfg(any(test, feature = "test"))]
671pub mod simple_worker {
672 use crate::worker::{Worker, WorkerContext, WorkerResult};
673 use async_trait::async_trait;
674 use concepts::{
675 FunctionFqn, FunctionMetadata, ParameterTypes, RETURN_TYPE_DUMMY,
676 storage::{HistoryEvent, Version},
677 };
678 use indexmap::IndexMap;
679 use std::sync::Arc;
680 use tracing::trace;
681
682 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
683 pub type SimpleWorkerResultMap =
684 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
685
686 #[derive(Clone, Debug)]
687 pub struct SimpleWorker {
688 pub worker_results_rev: SimpleWorkerResultMap,
689 pub ffqn: FunctionFqn,
690 exported: [FunctionMetadata; 1],
691 }
692
693 impl SimpleWorker {
694 #[must_use]
695 pub fn with_single_result(res: WorkerResult) -> Self {
696 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
697 Version::new(2),
698 (vec![], res),
699 )]))))
700 }
701
702 #[must_use]
703 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
704 Self {
705 worker_results_rev: self.worker_results_rev,
706 exported: [FunctionMetadata {
707 ffqn: ffqn.clone(),
708 parameter_types: ParameterTypes::default(),
709 return_type: RETURN_TYPE_DUMMY,
710 extension: None,
711 submittable: true,
712 }],
713 ffqn,
714 }
715 }
716
717 #[must_use]
718 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
719 Self {
720 worker_results_rev,
721 ffqn: FFQN_SOME,
722 exported: [FunctionMetadata {
723 ffqn: FFQN_SOME,
724 parameter_types: ParameterTypes::default(),
725 return_type: RETURN_TYPE_DUMMY,
726 extension: None,
727 submittable: true,
728 }],
729 }
730 }
731 }
732
733 #[async_trait]
734 impl Worker for SimpleWorker {
735 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
736 let (expected_version, (expected_eh, worker_result)) =
737 self.worker_results_rev.lock().unwrap().pop().unwrap();
738 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
739 assert_eq!(expected_version, ctx.version);
740 assert_eq!(expected_eh, ctx.event_history);
741 worker_result
742 }
743
744 fn exported_functions(&self) -> &[FunctionMetadata] {
745 &self.exported
746 }
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use self::simple_worker::SimpleWorker;
753 use super::*;
754 use crate::{expired_timers_watcher, worker::WorkerResult};
755 use assert_matches::assert_matches;
756 use async_trait::async_trait;
757 use concepts::storage::DbPoolCloseable;
758 use concepts::storage::{CreateRequest, DbConnection, JoinSetRequest};
759 use concepts::storage::{ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState};
760 use concepts::time::Now;
761 use concepts::{
762 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, RETURN_TYPE_DUMMY,
763 SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, SupportedFunctionReturnValue, TrapKind,
764 };
765 use db_tests::Database;
766 use indexmap::IndexMap;
767 use simple_worker::FFQN_SOME;
768 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
769 use test_utils::set_up;
770 use test_utils::sim_clock::{ConstClock, SimClock};
771
772 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
773
774 async fn tick_fn<W: Worker + Debug, C: ClockFn + 'static>(
775 config: ExecConfig,
776 clock_fn: C,
777 db_exec: Arc<dyn DbExecutor>,
778 worker: Arc<W>,
779 executed_at: DateTime<Utc>,
780 ) -> Vec<ExecutionId> {
781 trace!("Ticking with {worker:?}");
782 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
783 let executor = ExecTask::new(worker, config, clock_fn, db_exec, ffqns);
784 executor
785 .tick_test_await(executed_at, RunId::generate())
786 .await
787 }
788
789 #[tokio::test]
790 async fn execute_simple_lifecycle_tick_based_mem() {
791 let created_at = Now.now();
792 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
793 execute_simple_lifecycle_tick_based(
794 db_pool.connection().as_ref(),
795 db_exec.clone(),
796 ConstClock(created_at),
797 )
798 .await;
799 db_close.close().await;
800 }
801
802 #[tokio::test]
803 async fn execute_simple_lifecycle_tick_based_sqlite() {
804 let created_at = Now.now();
805 let (_guard, db_pool, db_exec, db_close) = Database::Sqlite.set_up().await;
806 execute_simple_lifecycle_tick_based(
807 db_pool.connection().as_ref(),
808 db_exec.clone(),
809 ConstClock(created_at),
810 )
811 .await;
812 db_close.close().await;
813 }
814
815 async fn execute_simple_lifecycle_tick_based<C: ClockFn + 'static>(
816 db_connection: &dyn DbConnection,
817 db_exec: Arc<dyn DbExecutor>,
818 clock_fn: C,
819 ) {
820 set_up();
821 let created_at = clock_fn.now();
822 let exec_config = ExecConfig {
823 batch_size: 1,
824 lock_expiry: Duration::from_secs(1),
825 tick_sleep: Duration::from_millis(100),
826 component_id: ComponentId::dummy_activity(),
827 task_limiter: None,
828 executor_id: ExecutorId::generate(),
829 };
830
831 let execution_log = create_and_tick(
832 CreateAndTickConfig {
833 execution_id: ExecutionId::generate(),
834 created_at,
835 max_retries: 0,
836 executed_at: created_at,
837 retry_exp_backoff: Duration::ZERO,
838 },
839 clock_fn,
840 db_connection,
841 db_exec,
842 exec_config,
843 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
844 SUPPORTED_RETURN_VALUE_OK_EMPTY,
845 Version::new(2),
846 None,
847 ))),
848 tick_fn,
849 )
850 .await;
851 assert_matches!(
852 execution_log.events.get(2).unwrap(),
853 ExecutionEvent {
854 event: ExecutionEventInner::Finished {
855 result: SupportedFunctionReturnValue::Ok { ok: None },
856 http_client_traces: None
857 },
858 created_at: _,
859 backtrace_id: None,
860 }
861 );
862 }
863
864 #[tokio::test]
865 async fn execute_simple_lifecycle_task_based_mem() {
866 set_up();
867 let created_at = Now.now();
868 let clock_fn = ConstClock(created_at);
869 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
870 let exec_config = ExecConfig {
871 batch_size: 1,
872 lock_expiry: Duration::from_secs(1),
873 tick_sleep: Duration::ZERO,
874 component_id: ComponentId::dummy_activity(),
875 task_limiter: None,
876 executor_id: ExecutorId::generate(),
877 };
878
879 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
880 SUPPORTED_RETURN_VALUE_OK_EMPTY,
881 Version::new(2),
882 None,
883 )));
884
885 let execution_log = create_and_tick(
886 CreateAndTickConfig {
887 execution_id: ExecutionId::generate(),
888 created_at,
889 max_retries: 0,
890 executed_at: created_at,
891 retry_exp_backoff: Duration::ZERO,
892 },
893 clock_fn,
894 db_pool.connection().as_ref(),
895 db_exec,
896 exec_config,
897 worker,
898 tick_fn,
899 )
900 .await;
901 assert_matches!(
902 execution_log.events.get(2).unwrap(),
903 ExecutionEvent {
904 event: ExecutionEventInner::Finished {
905 result: SupportedFunctionReturnValue::Ok { ok: None },
906 http_client_traces: None
907 },
908 created_at: _,
909 backtrace_id: None,
910 }
911 );
912 db_close.close().await;
913 }
914
915 struct CreateAndTickConfig {
916 execution_id: ExecutionId,
917 created_at: DateTime<Utc>,
918 max_retries: u32,
919 executed_at: DateTime<Utc>,
920 retry_exp_backoff: Duration,
921 }
922
923 async fn create_and_tick<
924 W: Worker,
925 C: ClockFn,
926 T: FnMut(ExecConfig, C, Arc<dyn DbExecutor>, Arc<W>, DateTime<Utc>) -> F,
927 F: Future<Output = Vec<ExecutionId>>,
928 >(
929 config: CreateAndTickConfig,
930 clock_fn: C,
931 db_connection: &dyn DbConnection,
932 db_exec: Arc<dyn DbExecutor>,
933 exec_config: ExecConfig,
934 worker: Arc<W>,
935 mut tick: T,
936 ) -> ExecutionLog {
937 db_connection
939 .create(CreateRequest {
940 created_at: config.created_at,
941 execution_id: config.execution_id.clone(),
942 ffqn: FFQN_SOME,
943 params: Params::empty(),
944 parent: None,
945 metadata: concepts::ExecutionMetadata::empty(),
946 scheduled_at: config.created_at,
947 retry_exp_backoff: config.retry_exp_backoff,
948 max_retries: config.max_retries,
949 component_id: ComponentId::dummy_activity(),
950 scheduled_by: None,
951 })
952 .await
953 .unwrap();
954 tick(exec_config, clock_fn, db_exec, worker, config.executed_at).await;
956 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
957 debug!("Execution history after tick: {execution_log:?}");
958 assert_matches!(
960 execution_log.events.first().unwrap(),
961 ExecutionEvent {
962 event: ExecutionEventInner::Created { .. },
963 created_at: actually_created_at,
964 backtrace_id: None,
965 }
966 if config.created_at == *actually_created_at
967 );
968 let locked_at = assert_matches!(
969 execution_log.events.get(1).unwrap(),
970 ExecutionEvent {
971 event: ExecutionEventInner::Locked { .. },
972 created_at: locked_at,
973 backtrace_id: None,
974 } if config.created_at <= *locked_at
975 => *locked_at
976 );
977 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
978 event: _,
979 created_at: executed_at,
980 backtrace_id: None,
981 } if *executed_at >= locked_at);
982 execution_log
983 }
984
985 #[tokio::test]
986 async fn activity_trap_should_trigger_an_execution_retry() {
987 set_up();
988 let sim_clock = SimClock::default();
989 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
990 let exec_config = ExecConfig {
991 batch_size: 1,
992 lock_expiry: Duration::from_secs(1),
993 tick_sleep: Duration::ZERO,
994 component_id: ComponentId::dummy_activity(),
995 task_limiter: None,
996 executor_id: ExecutorId::generate(),
997 };
998 let expected_reason = "error reason";
999 let expected_detail = "error detail";
1000 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1001 WorkerError::ActivityTrap {
1002 reason: expected_reason.to_string(),
1003 trap_kind: concepts::TrapKind::Trap,
1004 detail: Some(expected_detail.to_string()),
1005 version: Version::new(2),
1006 http_client_traces: None,
1007 },
1008 )));
1009 let retry_exp_backoff = Duration::from_millis(100);
1010 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
1011 let execution_log = create_and_tick(
1012 CreateAndTickConfig {
1013 execution_id: ExecutionId::generate(),
1014 created_at: sim_clock.now(),
1015 max_retries: 1,
1016 executed_at: sim_clock.now(),
1017 retry_exp_backoff,
1018 },
1019 sim_clock.clone(),
1020 db_pool.connection().as_ref(),
1021 db_exec.clone(),
1022 exec_config.clone(),
1023 worker,
1024 tick_fn,
1025 )
1026 .await;
1027 assert_eq!(3, execution_log.events.len());
1028 {
1029 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
1030 &execution_log.events.get(2).unwrap(),
1031 ExecutionEvent {
1032 event: ExecutionEventInner::TemporarilyFailed {
1033 reason_inner,
1034 reason_full,
1035 detail,
1036 backoff_expires_at,
1037 http_client_traces: None,
1038 },
1039 created_at: at,
1040 backtrace_id: None,
1041 }
1042 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
1043 );
1044 assert_eq!(expected_reason, reason_inner.deref());
1045 assert_eq!(
1046 format!("activity trap: {expected_reason}"),
1047 reason_full.deref()
1048 );
1049 assert_eq!(Some(expected_detail), detail.as_deref());
1050 assert_eq!(at, sim_clock.now());
1051 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
1052 }
1053 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
1054 std::sync::Mutex::new(IndexMap::from([(
1055 Version::new(4),
1056 (
1057 vec![],
1058 WorkerResult::Ok(SUPPORTED_RETURN_VALUE_OK_EMPTY, Version::new(4), None),
1059 ),
1060 )])),
1061 )));
1062 assert!(
1064 tick_fn(
1065 exec_config.clone(),
1066 sim_clock.clone(),
1067 db_exec.clone(),
1068 worker.clone(),
1069 sim_clock.now(),
1070 )
1071 .await
1072 .is_empty()
1073 );
1074 sim_clock.move_time_forward(retry_exp_backoff);
1076 tick_fn(
1077 exec_config,
1078 sim_clock.clone(),
1079 db_exec.clone(),
1080 worker,
1081 sim_clock.now(),
1082 )
1083 .await;
1084 let execution_log = {
1085 let db_connection = db_pool.connection();
1086 db_connection
1087 .get(&execution_log.execution_id)
1088 .await
1089 .unwrap()
1090 };
1091 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1092 assert_matches!(
1093 execution_log.events.get(3).unwrap(),
1094 ExecutionEvent {
1095 event: ExecutionEventInner::Locked { .. },
1096 created_at: at,
1097 backtrace_id: None,
1098 } if *at == sim_clock.now()
1099 );
1100 assert_matches!(
1101 execution_log.events.get(4).unwrap(),
1102 ExecutionEvent {
1103 event: ExecutionEventInner::Finished {
1104 result: SupportedFunctionReturnValue::Ok{ok:None},
1105 http_client_traces: None
1106 },
1107 created_at: finished_at,
1108 backtrace_id: None,
1109 } if *finished_at == sim_clock.now()
1110 );
1111 db_close.close().await;
1112 }
1113
1114 #[tokio::test]
1115 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1116 set_up();
1117 let created_at = Now.now();
1118 let clock_fn = ConstClock(created_at);
1119 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1120 let exec_config = ExecConfig {
1121 batch_size: 1,
1122 lock_expiry: Duration::from_secs(1),
1123 tick_sleep: Duration::ZERO,
1124 component_id: ComponentId::dummy_activity(),
1125 task_limiter: None,
1126 executor_id: ExecutorId::generate(),
1127 };
1128
1129 let expected_reason = "error reason";
1130 let expected_detail = "error detail";
1131 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1132 WorkerError::ActivityTrap {
1133 reason: expected_reason.to_string(),
1134 trap_kind: concepts::TrapKind::Trap,
1135 detail: Some(expected_detail.to_string()),
1136 version: Version::new(2),
1137 http_client_traces: None,
1138 },
1139 )));
1140 let execution_log = create_and_tick(
1141 CreateAndTickConfig {
1142 execution_id: ExecutionId::generate(),
1143 created_at,
1144 max_retries: 0,
1145 executed_at: created_at,
1146 retry_exp_backoff: Duration::ZERO,
1147 },
1148 clock_fn,
1149 db_pool.connection().as_ref(),
1150 db_exec,
1151 exec_config.clone(),
1152 worker,
1153 tick_fn,
1154 )
1155 .await;
1156 assert_eq!(3, execution_log.events.len());
1157 let (reason, kind, detail) = assert_matches!(
1158 &execution_log.events.get(2).unwrap(),
1159 ExecutionEvent {
1160 event: ExecutionEventInner::Finished{
1161 result: SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_}),
1162 http_client_traces: None
1163 },
1164 created_at: at,
1165 backtrace_id: None,
1166 } if *at == created_at
1167 => (reason_inner, kind, detail)
1168 );
1169 assert_eq!(expected_reason, *reason);
1170 assert_eq!(Some(expected_detail), detail.as_deref());
1171 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1172
1173 db_close.close().await;
1174 }
1175
1176 #[tokio::test]
1177 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1178 let worker_error = WorkerError::ActivityTrap {
1179 reason: "error reason".to_string(),
1180 trap_kind: TrapKind::Trap,
1181 detail: Some("detail".to_string()),
1182 version: Version::new(2),
1183 http_client_traces: None,
1184 };
1185 let expected_child_err = FinishedExecutionError::PermanentFailure {
1186 reason_full: "activity trap: error reason".to_string(),
1187 reason_inner: "error reason".to_string(),
1188 kind: PermanentFailureKind::ActivityTrap,
1189 detail: Some("detail".to_string()),
1190 };
1191 child_execution_permanently_failed_should_notify_parent(
1192 WorkerResult::Err(worker_error),
1193 expected_child_err,
1194 )
1195 .await;
1196 }
1197
1198 #[tokio::test]
1201 async fn child_execution_permanently_failed_handled_by_watcher_should_notify_parent_timeout() {
1202 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1203 child_execution_permanently_failed_should_notify_parent(
1204 WorkerResult::DbUpdatedByWorkerOrWatcher,
1205 expected_child_err,
1206 )
1207 .await;
1208 }
1209
1210 async fn child_execution_permanently_failed_should_notify_parent(
1211 worker_result: WorkerResult,
1212 expected_child_err: FinishedExecutionError,
1213 ) {
1214 use concepts::storage::JoinSetResponseEventOuter;
1215 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1216
1217 set_up();
1218 let sim_clock = SimClock::default();
1219 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1220
1221 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1222 WorkerResult::DbUpdatedByWorkerOrWatcher,
1223 ));
1224 let parent_execution_id = ExecutionId::generate();
1225 db_pool
1226 .connection()
1227 .create(CreateRequest {
1228 created_at: sim_clock.now(),
1229 execution_id: parent_execution_id.clone(),
1230 ffqn: FFQN_SOME,
1231 params: Params::empty(),
1232 parent: None,
1233 metadata: concepts::ExecutionMetadata::empty(),
1234 scheduled_at: sim_clock.now(),
1235 retry_exp_backoff: Duration::ZERO,
1236 max_retries: 0,
1237 component_id: ComponentId::dummy_activity(),
1238 scheduled_by: None,
1239 })
1240 .await
1241 .unwrap();
1242 tick_fn(
1243 ExecConfig {
1244 batch_size: 1,
1245 lock_expiry: LOCK_EXPIRY,
1246 tick_sleep: Duration::ZERO,
1247 component_id: ComponentId::dummy_activity(),
1248 task_limiter: None,
1249 executor_id: ExecutorId::generate(),
1250 },
1251 sim_clock.clone(),
1252 db_exec.clone(),
1253 parent_worker,
1254 sim_clock.now(),
1255 )
1256 .await;
1257
1258 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1259 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1260 {
1262 let child = CreateRequest {
1263 created_at: sim_clock.now(),
1264 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1265 ffqn: FFQN_CHILD,
1266 params: Params::empty(),
1267 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1268 metadata: concepts::ExecutionMetadata::empty(),
1269 scheduled_at: sim_clock.now(),
1270 retry_exp_backoff: Duration::ZERO,
1271 max_retries: 0,
1272 component_id: ComponentId::dummy_activity(),
1273 scheduled_by: None,
1274 };
1275 let current_time = sim_clock.now();
1276 let join_set = AppendRequest {
1277 created_at: current_time,
1278 event: ExecutionEventInner::HistoryEvent {
1279 event: HistoryEvent::JoinSetCreate {
1280 join_set_id: join_set_id.clone(),
1281 closing_strategy: ClosingStrategy::Complete,
1282 },
1283 },
1284 };
1285 let child_exec_req = AppendRequest {
1286 created_at: current_time,
1287 event: ExecutionEventInner::HistoryEvent {
1288 event: HistoryEvent::JoinSetRequest {
1289 join_set_id: join_set_id.clone(),
1290 request: JoinSetRequest::ChildExecutionRequest {
1291 child_execution_id: child_execution_id.clone(),
1292 },
1293 },
1294 },
1295 };
1296 let join_next = AppendRequest {
1297 created_at: current_time,
1298 event: ExecutionEventInner::HistoryEvent {
1299 event: HistoryEvent::JoinNext {
1300 join_set_id: join_set_id.clone(),
1301 run_expires_at: sim_clock.now(),
1302 closing: false,
1303 requested_ffqn: Some(FFQN_CHILD),
1304 },
1305 },
1306 };
1307 db_pool
1308 .connection()
1309 .append_batch_create_new_execution(
1310 current_time,
1311 vec![join_set, child_exec_req, join_next],
1312 parent_execution_id.clone(),
1313 Version::new(2),
1314 vec![child],
1315 )
1316 .await
1317 .unwrap();
1318 }
1319
1320 let child_worker =
1321 Arc::new(SimpleWorker::with_single_result(worker_result).with_ffqn(FFQN_CHILD));
1322
1323 tick_fn(
1325 ExecConfig {
1326 batch_size: 1,
1327 lock_expiry: LOCK_EXPIRY,
1328 tick_sleep: Duration::ZERO,
1329 component_id: ComponentId::dummy_activity(),
1330 task_limiter: None,
1331 executor_id: ExecutorId::generate(),
1332 },
1333 sim_clock.clone(),
1334 db_exec.clone(),
1335 child_worker,
1336 sim_clock.now(),
1337 )
1338 .await;
1339 if matches!(expected_child_err, FinishedExecutionError::PermanentTimeout) {
1340 sim_clock.move_time_forward(LOCK_EXPIRY);
1342 expired_timers_watcher::tick(db_pool.connection().as_ref(), sim_clock.now())
1343 .await
1344 .unwrap();
1345 }
1346 let child_log = db_pool
1347 .connection()
1348 .get(&ExecutionId::Derived(child_execution_id.clone()))
1349 .await
1350 .unwrap();
1351 assert!(child_log.pending_state.is_finished());
1352 assert_eq!(
1353 Version(2),
1354 child_log.next_version,
1355 "created = 0, locked = 1, with_single_result = 2"
1356 );
1357 assert_eq!(
1358 ExecutionEventInner::Finished {
1359 result: SupportedFunctionReturnValue::ExecutionError(expected_child_err),
1360 http_client_traces: None
1361 },
1362 child_log.last_event().event
1363 );
1364 let parent_log = db_pool
1365 .connection()
1366 .get(&parent_execution_id)
1367 .await
1368 .unwrap();
1369 assert_matches!(
1370 parent_log.pending_state,
1371 PendingState::PendingAt {
1372 scheduled_at
1373 } if scheduled_at == sim_clock.now(),
1374 "parent should be back to pending"
1375 );
1376 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1377 parent_log.responses.last(),
1378 Some(JoinSetResponseEventOuter{
1379 created_at: at,
1380 event: JoinSetResponseEvent{
1381 join_set_id: found_join_set_id,
1382 event: JoinSetResponse::ChildExecutionFinished {
1383 child_execution_id: found_child_execution_id,
1384 finished_version,
1385 result: found_result,
1386 }
1387 }
1388 })
1389 if *at == sim_clock.now()
1390 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1391 );
1392 assert_eq!(join_set_id, *found_join_set_id);
1393 assert_eq!(child_execution_id, *found_child_execution_id);
1394 assert_eq!(child_log.next_version, *child_finished_version);
1395 assert_matches!(
1396 found_result,
1397 SupportedFunctionReturnValue::ExecutionError(_)
1398 );
1399
1400 db_close.close().await;
1401 }
1402
1403 #[derive(Clone, Debug)]
1404 struct SleepyWorker {
1405 duration: Duration,
1406 result: SupportedFunctionReturnValue,
1407 exported: [FunctionMetadata; 1],
1408 }
1409
1410 #[async_trait]
1411 impl Worker for SleepyWorker {
1412 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1413 tokio::time::sleep(self.duration).await;
1414 WorkerResult::Ok(self.result.clone(), ctx.version, None)
1415 }
1416
1417 fn exported_functions(&self) -> &[FunctionMetadata] {
1418 &self.exported
1419 }
1420 }
1421
1422 #[tokio::test]
1423 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1424 set_up();
1425 let sim_clock = SimClock::default();
1426 let (_guard, db_pool, db_exec, db_close) = Database::Memory.set_up().await;
1427 let lock_expiry = Duration::from_millis(100);
1428 let exec_config = ExecConfig {
1429 batch_size: 1,
1430 lock_expiry,
1431 tick_sleep: Duration::ZERO,
1432 component_id: ComponentId::dummy_activity(),
1433 task_limiter: None,
1434 executor_id: ExecutorId::generate(),
1435 };
1436
1437 let worker = Arc::new(SleepyWorker {
1438 duration: lock_expiry + Duration::from_millis(1), result: SUPPORTED_RETURN_VALUE_OK_EMPTY,
1440 exported: [FunctionMetadata {
1441 ffqn: FFQN_SOME,
1442 parameter_types: ParameterTypes::default(),
1443 return_type: RETURN_TYPE_DUMMY,
1444 extension: None,
1445 submittable: true,
1446 }],
1447 });
1448 let execution_id = ExecutionId::generate();
1450 let timeout_duration = Duration::from_millis(300);
1451 let db_connection = db_pool.connection();
1452 db_connection
1453 .create(CreateRequest {
1454 created_at: sim_clock.now(),
1455 execution_id: execution_id.clone(),
1456 ffqn: FFQN_SOME,
1457 params: Params::empty(),
1458 parent: None,
1459 metadata: concepts::ExecutionMetadata::empty(),
1460 scheduled_at: sim_clock.now(),
1461 retry_exp_backoff: timeout_duration,
1462 max_retries: 1,
1463 component_id: ComponentId::dummy_activity(),
1464 scheduled_by: None,
1465 })
1466 .await
1467 .unwrap();
1468
1469 let ffqns = super::extract_exported_ffqns_noext(worker.as_ref());
1470 let executor = ExecTask::new(
1471 worker,
1472 exec_config,
1473 sim_clock.clone(),
1474 db_exec.clone(),
1475 ffqns,
1476 );
1477 let mut first_execution_progress = executor
1478 .tick(sim_clock.now(), RunId::generate())
1479 .await
1480 .unwrap();
1481 assert_eq!(1, first_execution_progress.executions.len());
1482 sim_clock.move_time_forward(lock_expiry);
1484 let now_after_first_lock_expiry = sim_clock.now();
1486 {
1487 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1488 let cleanup_progress = executor
1489 .tick(now_after_first_lock_expiry, RunId::generate())
1490 .await
1491 .unwrap();
1492 assert!(cleanup_progress.executions.is_empty());
1493 }
1494 {
1495 let expired_locks = expired_timers_watcher::tick(
1496 db_pool.connection().as_ref(),
1497 now_after_first_lock_expiry,
1498 )
1499 .await
1500 .unwrap()
1501 .expired_locks;
1502 assert_eq!(1, expired_locks);
1503 }
1504 assert!(
1505 !first_execution_progress
1506 .executions
1507 .pop()
1508 .unwrap()
1509 .1
1510 .is_finished()
1511 );
1512
1513 let execution_log = db_connection.get(&execution_id).await.unwrap();
1514 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1515 assert_matches!(
1516 &execution_log.events.get(2).unwrap(),
1517 ExecutionEvent {
1518 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at, .. },
1519 created_at: at,
1520 backtrace_id: None,
1521 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1522 );
1523 assert_eq!(
1524 PendingState::PendingAt {
1525 scheduled_at: expected_first_timeout_expiry
1526 },
1527 execution_log.pending_state
1528 );
1529 sim_clock.move_time_forward(timeout_duration);
1530 let now_after_first_timeout = sim_clock.now();
1531 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1532
1533 let mut second_execution_progress = executor
1534 .tick(now_after_first_timeout, RunId::generate())
1535 .await
1536 .unwrap();
1537 assert_eq!(1, second_execution_progress.executions.len());
1538
1539 sim_clock.move_time_forward(lock_expiry);
1541 let now_after_second_lock_expiry = sim_clock.now();
1543 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1544 {
1545 let cleanup_progress = executor
1546 .tick(now_after_second_lock_expiry, RunId::generate())
1547 .await
1548 .unwrap();
1549 assert!(cleanup_progress.executions.is_empty());
1550 }
1551 {
1552 let expired_locks = expired_timers_watcher::tick(
1553 db_pool.connection().as_ref(),
1554 now_after_second_lock_expiry,
1555 )
1556 .await
1557 .unwrap()
1558 .expired_locks;
1559 assert_eq!(1, expired_locks);
1560 }
1561 assert!(
1562 !second_execution_progress
1563 .executions
1564 .pop()
1565 .unwrap()
1566 .1
1567 .is_finished()
1568 );
1569
1570 drop(db_connection);
1571 drop(executor);
1572 db_close.close().await;
1573 }
1574}