1use super::activity_ctx::{self, ActivityCtx};
2use crate::activity::cancel_registry::CancelRegistry;
3use crate::component_logger::{LogStrageConfig, log_activities};
4use crate::envvar::EnvVar;
5use crate::http_hooks::ConfigSectionHint;
6use crate::std_output_stream::{StdOutputConfig, StdOutputConfigWithSender};
7use crate::{RunnableComponent, WasmFileError};
8use async_trait::async_trait;
9use chrono::{DateTime, Utc};
10use concepts::storage::http_client_trace::HttpClientTrace;
11use concepts::storage::{LogInfoAppendRow, LogStreamType, Version};
12use concepts::time::{ClockFn, Sleep, now_tokio_instant};
13use concepts::{
14 ComponentId, FunctionFqn, PackageIfcFns, Params, SupportedFunctionReturnValue, TrapKind,
15};
16use concepts::{FunctionMetadata, ResultParsingError};
17use executor::worker::{FatalError, RunFinished, WorkerContext, WorkerResult, WorkerResultOk};
18use executor::worker::{Worker, WorkerError};
19use itertools::Itertools;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::mpsc;
23use tracing::{debug, info, trace};
24use utils::wasm_tools::ExIm;
25use wasmtime::component::{ComponentExportIndex, InstancePre, Type};
26use wasmtime::{Engine, component::Val};
27use wasmtime::{Store, UpdateDeadline};
28
29#[derive(Clone, Debug)]
30pub struct ActivityConfig {
31 pub component_id: ComponentId,
32 pub forward_stdout: Option<StdOutputConfig>,
33 pub forward_stderr: Option<StdOutputConfig>,
34 pub env_vars: Arc<[EnvVar]>,
35 pub fuel: Option<u64>,
36 pub allowed_hosts: Arc<[crate::http_request_policy::AllowedHostConfig]>,
37 pub config_section_hint: ConfigSectionHint,
39}
40
41#[derive(derive_more::Debug)]
42pub struct ActivityWorkerCompiled {
43 #[debug(skip)]
44 engine: Arc<Engine>,
45 #[debug(skip)]
46 instance_pre: InstancePre<ActivityCtx>,
47 exim: ExIm,
48 #[debug(skip)]
49 clock_fn: Box<dyn ClockFn>,
50 #[debug(skip)]
51 sleep: Arc<dyn Sleep>,
52 exported_ffqn_to_index: hashbrown::HashMap<FunctionFqn, ComponentExportIndex>,
53 config: ActivityConfig,
54}
55impl ActivityWorkerCompiled {
56 pub fn new_with_config(
57 runnable_component: RunnableComponent,
58 config: ActivityConfig,
59 engine: Arc<Engine>,
60 clock_fn: Box<dyn ClockFn>,
61 sleep: Arc<dyn Sleep>,
62 ) -> Result<Self, WasmFileError> {
63 let mut linker = wasmtime::component::Linker::new(&engine);
64 wasmtime_wasi::p2::add_to_linker_async(&mut linker)
66 .map_err(|err| WasmFileError::linking_error("cannot link wasi", err))?;
67 wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker)
69 .map_err(|err| WasmFileError::linking_error("cannot link wasi-http", err))?;
70 log_activities::obelisk::log::log::add_to_linker::<_, ActivityCtx>(&mut linker, |x| x)
72 .map_err(|err| WasmFileError::linking_error("cannot link obelisk:log", err))?;
73 let instance_pre = linker
75 .instantiate_pre(&runnable_component.wasmtime_component)
76 .map_err(|err| WasmFileError::linking_error("cannot link activity", err))?;
77
78 let exported_ffqn_to_index = RunnableComponent::index_exported_functions(
79 &runnable_component.wasmtime_component,
80 &runnable_component.wasm_component.exim,
81 )
82 .map_err(WasmFileError::DecodeError)?;
83 Ok(Self {
84 engine,
85 exim: runnable_component.wasm_component.exim,
86 clock_fn,
87 sleep,
88 exported_ffqn_to_index,
89 config,
90 instance_pre,
91 })
92 }
93
94 #[must_use]
95 pub fn exported_functions_ext(&self) -> &[FunctionMetadata] {
96 self.exim.get_exports(true)
97 }
98
99 #[must_use]
100 pub fn exports_hierarchy_ext(&self) -> &[PackageIfcFns] {
101 self.exim.get_exports_hierarchy_ext()
102 }
103
104 #[must_use]
105 pub fn imported_functions(&self) -> &[FunctionMetadata] {
106 &self.exim.imports_flat
107 }
108
109 #[must_use]
110 pub fn into_worker(
111 self,
112 cancel_registry: CancelRegistry,
113 log_forwarder_sender: &mpsc::Sender<LogInfoAppendRow>,
114 logs_storage_config: Option<LogStrageConfig>,
115 ) -> ActivityWorker {
116 let stdout = StdOutputConfigWithSender::new(
117 self.config.forward_stdout,
118 log_forwarder_sender,
119 LogStreamType::StdOut,
120 );
121 let stderr = StdOutputConfigWithSender::new(
122 self.config.forward_stderr,
123 log_forwarder_sender,
124 LogStreamType::StdErr,
125 );
126 ActivityWorker {
127 engine: self.engine,
128 instance_pre: self.instance_pre,
129 exim: self.exim,
130 clock_fn: self.clock_fn,
131 sleep: self.sleep,
132 exported_ffqn_to_index: self.exported_ffqn_to_index,
133 config: self.config,
134 cancel_registry,
135 stdout,
136 stderr,
137 logs_storage_config,
138 }
139 }
140}
141
142pub struct ActivityWorker {
143 engine: Arc<Engine>,
144 instance_pre: InstancePre<ActivityCtx>,
145 exim: ExIm,
146 clock_fn: Box<dyn ClockFn>,
147 sleep: Arc<dyn Sleep>,
148 exported_ffqn_to_index: hashbrown::HashMap<FunctionFqn, ComponentExportIndex>,
149 config: ActivityConfig,
150 cancel_registry: CancelRegistry,
151 stdout: Option<StdOutputConfigWithSender>,
152 stderr: Option<StdOutputConfigWithSender>,
153 logs_storage_config: Option<LogStrageConfig>,
154}
155
156impl ActivityWorker {
157 #[must_use]
158 pub fn exported_functions_ext(&self) -> &[FunctionMetadata] {
159 self.exim.get_exports(true)
160 }
161
162 #[must_use]
163 pub fn exports_hierarchy_ext(&self) -> &[PackageIfcFns] {
164 self.exim.get_exports_hierarchy_ext()
165 }
166
167 #[must_use]
168 pub fn imported_functions(&self) -> &[FunctionMetadata] {
169 &self.exim.imports_flat
170 }
171}
172
173#[async_trait]
174impl Worker for ActivityWorker {
175 fn exported_functions_noext(&self) -> &[FunctionMetadata] {
176 self.exim.get_exports(false)
177 }
178
179 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
180 trace!("Context: {ctx:?}");
181 assert!(ctx.event_history.is_empty());
182 let cancelation_token = self
183 .cancel_registry
184 .obtain_cancellation_token(ctx.execution_id.clone());
185
186 let started_at = self.clock_fn.now();
187 ctx.worker_span.record(
188 "execution_deadline",
189 tracing::field::display(&ctx.locked_event.lock_expires_at),
190 );
191
192 let ffqn = ctx.ffqn.clone();
193 let params = ctx.params.clone();
194 let version = ctx.version.clone();
195 let worker_span = ctx.worker_span.clone();
196
197 let (mut store, deadline_duration) = match self.create_store(ctx, started_at) {
198 Ok(store) => store,
199 Err(err) => return WorkerResult::Err(err),
200 };
201
202 let stopwatch_for_reporting = now_tokio_instant(); let call_function = {
205 let call_func_params = match self
206 .call_func_params(&ffqn, ¶ms, &version, &mut store)
207 .await
208 {
209 Ok(ok) => ok,
210 Err(err) => return WorkerResult::Err(err),
211 };
212 self.call_func(&mut store, call_func_params)
213 };
214
215 tokio::select! { res = call_function => {
217 let activity_ctx = store.into_data();
218 let res = self.process_res(res, &version, activity_ctx);
219 worker_span.in_scope(|| {
220 match &res {
221 Ok(worker_res_ok) => {
222 info!(duration = ?stopwatch_for_reporting.elapsed(), "Run finished: {worker_res_ok}");
223 }
224 Err(WorkerError::ExecutorClosing(_)) => {
225 info!("Executor closing");
226 }
227 Err(err) => {
228 info!(%err, duration = ?stopwatch_for_reporting.elapsed(), "Run finished with an error");
229 }
230 }
231 });
232 return res;
233 },
234 () = self.sleep.sleep(deadline_duration) => {
235 let activity_ctx = store.into_data();
236 worker_span.in_scope(||
237 info!(duration = ?stopwatch_for_reporting.elapsed(), %started_at,
238 now = %self.clock_fn.now(),
239 "Run timed out")
240 );
241 let http_client_traces = Some(activity_ctx.http_hooks.http_client_traces
242 .into_iter()
243 .map(|(req, mut resp)| HttpClientTrace {
244 req,
245 resp: resp.try_recv().ok(),
246 })
247 .collect_vec());
248 return WorkerResult::Err(WorkerError::TemporaryTimeout{
249 http_client_traces,
250 version,
251 });
252 }
253 _signal = cancelation_token => {
254 debug!("Activity run interrupted, DB must have been updated");
258 return WorkerResult::Ok(WorkerResultOk::DbUpdatedByWorkerOrWatcher);
259 }
260 }
261 }
262}
263
264struct CallFuncParams {
265 func: wasmtime::component::Func,
266 params: Arc<[Val]>,
267 result_type: Type,
268}
269
270impl ActivityWorker {
271 fn create_store(
272 &self,
273 ctx: WorkerContext,
274 started_at: DateTime<Utc>,
275 ) -> Result<(Store<ActivityCtx>, Duration ), WorkerError> {
276 let lock_expires_at = ctx.locked_event.lock_expires_at;
277 let worker_span = ctx.worker_span.clone();
278 let version = ctx.version.clone();
279
280 let stdout = self
281 .stdout
282 .as_ref()
283 .map(|it| it.build(&ctx.execution_id, ctx.locked_event.run_id));
284 let stderr = self
285 .stderr
286 .as_ref()
287 .map(|it| it.build(&ctx.execution_id, ctx.locked_event.run_id));
288
289 let mut store = activity_ctx::store(
290 &self.engine,
291 ctx,
292 &self.config,
293 self.clock_fn.clone_box(),
294 stdout,
295 stderr,
296 self.logs_storage_config.clone(),
297 );
298
299 if let Some(fuel) = self.config.fuel {
301 store
302 .set_fuel(fuel)
303 .expect("engine must have `consume_fuel` enabled");
304 }
305
306 store.epoch_deadline_callback(|store_ctx| {
308 let executor_closing = *store_ctx.data().executor_close_watcher.borrow();
309 if executor_closing {
310 info!("Executor closing");
311 Ok(UpdateDeadline::Interrupt) } else {
313 Ok(UpdateDeadline::YieldCustom(
314 1,
315 Box::pin(tokio::task::yield_now()),
316 ))
317 }
318 });
319
320 let deadline_delta = lock_expires_at - started_at;
321 let Ok(deadline_duration) = deadline_delta.to_std() else {
322 worker_span.in_scope(|| {
323 info!(execution_deadline = %lock_expires_at, %started_at,
324 "Timed out - started_at later than execution_deadline");
325 });
326 return Err(WorkerError::TemporaryTimeout {
327 http_client_traces: None,
328 version,
329 });
330 };
331 worker_span.record(
332 "deadline_duration",
333 tracing::field::debug(&deadline_duration),
334 );
335 Ok((store, deadline_duration))
336 }
337
338 async fn call_func_params(
339 &self,
340 ffqn: &FunctionFqn,
341 params: &Params,
342 version: &Version,
343 store: &mut Store<ActivityCtx>,
344 ) -> Result<CallFuncParams, WorkerError> {
345 let instance = match self.instance_pre.instantiate_async(&mut *store).await {
346 Ok(instance) => instance,
347 Err(err) => {
348 let reason = err.to_string();
349 if reason.starts_with("maximum concurrent") {
350 return Err(WorkerError::LimitReached {
351 reason,
352 version: version.clone(),
353 });
354 }
355 return Err(WorkerError::FatalError(
356 FatalError::CannotInstantiate {
357 reason: format!("cannot instantiate: {err}"),
358 detail: Some(format!("{err:?}")),
359 },
360 version.clone(),
361 ));
362 }
363 };
364 let func = {
365 let fn_export_index = self
366 .exported_ffqn_to_index
367 .get(ffqn)
368 .expect("executor only calls `run` with ffqns that are exported");
369 instance
370 .get_func(&mut *store, fn_export_index)
371 .expect("exported function found with wit-parser but not with wasmtime")
372 };
373
374 let component_func = func.ty(store);
375 let params = match params.as_vals(component_func.params()) {
376 Ok(params) => params,
377 Err(err) => {
378 return Err(WorkerError::FatalError(
379 FatalError::ParamsParsingError(err),
380 version.clone(),
381 ));
382 }
383 };
384
385 let result_types = component_func.results().collect::<Vec<_>>(); assert!(
387 result_types.len() == 1,
388 "multi-value and void results are not supported, must have been checked in function registry"
389 );
390
391 Ok(CallFuncParams {
392 func,
393 params,
394 result_type: result_types
395 .into_iter()
396 .next()
397 .expect("just checked that size == 1"),
398 })
399 }
400
401 async fn call_func(
402 &self,
403 store: &mut Store<ActivityCtx>,
404 CallFuncParams {
405 func,
406 params,
407 result_type,
408 }: CallFuncParams,
409 ) -> Result<Result<SupportedFunctionReturnValue, ResultParsingError>, wasmtime::Error> {
410 let mut results = vec![Val::Bool(false)];
411 let res = func
412 .call_async(&mut *store, ¶ms, &mut results)
413 .await
414 .map(|()| {
415 (
416 results.into_iter().next().expect("results size is 1"),
417 result_type,
418 )
419 });
420 res.map(|(val, r#type)| SupportedFunctionReturnValue::new(val, r#type))
421 }
422
423 fn process_res(
424 &self,
425 res: Result<Result<SupportedFunctionReturnValue, ResultParsingError>, wasmtime::Error>,
426 version: &Version,
427 activity_ctx: ActivityCtx,
428 ) -> WorkerResult {
429 let http_client_traces = Some(
430 activity_ctx
431 .http_hooks
432 .http_client_traces
433 .into_iter()
434 .map(|(req, mut resp)| HttpClientTrace {
435 req,
436 resp: resp.try_recv().ok(),
437 })
438 .collect_vec(),
439 );
440 match res {
441 Ok(Ok(result)) => WorkerResult::Ok(WorkerResultOk::RunFinished(RunFinished {
442 retval: result,
443 version: version.clone(),
444 http_client_traces,
445 })),
446 Ok(Err(result_parsing_err)) => WorkerResult::Err(WorkerError::FatalError(
447 FatalError::ResultParsingError(result_parsing_err),
448 version.clone(),
449 )),
450 Err(err) => WorkerResult::Err(
451 if let Some(trap) = err
452 .source()
453 .and_then(|source| source.downcast_ref::<wasmtime::Trap>())
454 {
455 if *trap == wasmtime::Trap::OutOfFuel {
456 WorkerError::ActivityTrap {
457 reason: format!(
458 "total fuel consumed: {}",
459 self.config
460 .fuel
461 .expect("must have been set as it was the reason of trap")
462 ),
463 detail: None,
464 trap_kind: TrapKind::OutOfFuel,
465 version: version.clone(),
466 http_client_traces,
467 }
468 } else if *trap == wasmtime::Trap::Interrupt {
469 WorkerError::ExecutorClosing(version.clone())
470 } else {
471 WorkerError::ActivityTrap {
472 reason: trap.to_string(),
473 detail: Some(format!("{err:?}")),
474 trap_kind: TrapKind::Trap,
475 version: version.clone(),
476 http_client_traces,
477 }
478 }
479 } else {
480 WorkerError::ActivityTrap {
481 reason: err.to_string(),
482 trap_kind: TrapKind::HostFunctionError,
483 detail: Some(format!("{err:?}")),
484 version: version.clone(),
485 http_client_traces,
486 }
487 },
488 ),
489 }
490 }
491}
492
493#[cfg(any(test, feature = "test"))]
494pub mod test {
495 use concepts::{ComponentId, ComponentType, StrVariant, component_id::ComponentDigest};
496 use utils::sha256sum::calculate_sha256_file;
497 use wasmtime::Engine;
498
499 use crate::{
500 RunnableComponent,
501 engines::{EngineConfig, Engines},
502 };
503
504 pub async fn compile_activity(wasm_path: &str) -> (RunnableComponent, ComponentId) {
505 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
506 compile_activity_with_engine(wasm_path, &engine, ComponentType::Activity).await
507 }
508
509 #[allow(dead_code)] pub(crate) async fn compile_activity_stub(wasm_path: &str) -> (RunnableComponent, ComponentId) {
511 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
512 compile_activity_with_engine(wasm_path, &engine, ComponentType::ActivityStub).await
513 }
514
515 pub(crate) async fn compile_activity_with_engine(
516 wasm_path: &str,
517 engine: &Engine,
518 component_type: ComponentType,
519 ) -> (RunnableComponent, ComponentId) {
520 assert!(component_type.is_activity());
521 let file_digest = calculate_sha256_file(wasm_path).await.unwrap();
522 let component_id = ComponentId::new(
523 component_type,
524 StrVariant::empty(),
525 ComponentDigest(file_digest.0),
526 )
527 .unwrap();
528 (
529 RunnableComponent::new(wasm_path, engine, component_type).unwrap(),
530 component_id,
531 )
532 }
533}
534
535#[cfg(test)]
536pub(crate) mod tests {
537 use super::*;
538 use crate::activity::activity_worker::test::compile_activity_with_engine;
539 use crate::engines::PoolingOptions;
540 use crate::engines::{EngineConfig, Engines};
541 use crate::http_hooks::ConfigSectionHint;
542 use crate::http_request_policy::{AllowedHostConfig, HostPattern, MethodsPattern};
543 use assert_matches::assert_matches;
544 use concepts::prefixed_ulid::{DEPLOYMENT_ID_DUMMY, RunId};
545 use concepts::storage::Locked;
546 use concepts::storage::http_client_trace::{RequestTrace, ResponseTrace};
547 use concepts::storage::{DbPool, TimeoutOutcome};
548 use concepts::storage::{ExecutionRequest, Version};
549 use concepts::time::Now;
550 use concepts::time::TokioSleep;
551 use concepts::{ComponentRetryConfig, ComponentType};
552 use concepts::{
553 ExecutionFailureKind, FinishedExecutionFailure, SUPPORTED_RETURN_VALUE_OK_EMPTY,
554 };
555 use concepts::{
556 ExecutionId, FunctionFqn, Params, SupportedFunctionReturnValue, prefixed_ulid::ExecutorId,
557 storage::CreateRequest, storage::DbPoolCloseable,
558 };
559 use db_tests::Database;
560 use executor::executor::LockingStrategy;
561 use executor::executor::{ExecConfig, ExecTask};
562 use insta::assert_json_snapshot;
563 use rstest::rstest;
564 use serde_json::json;
565 use std::future;
566 use std::time::Duration;
567 use test_utils::env_or_default;
568 use test_utils::sim_clock::SimClock;
569 use tracing::{debug, info, info_span};
570 use val_json::{
571 type_wrapper::TypeWrapper,
572 wast_val::{WastVal, WastValWithType},
573 };
574
575 pub const SLEEP_LOOP_ACTIVITY_FFQN: FunctionFqn = FunctionFqn::new_static_tuple(
576 test_programs_sleep_activity_builder::exports::testing::sleep::sleep::SLEEP_LOOP,
577 ); pub const HTTP_GET_SUCCESSFUL_ACTIVITY: FunctionFqn = FunctionFqn::new_static_tuple(
579 test_programs_http_get_activity_builder::exports::testing::http::http_get::GET_SUCCESSFUL,
580 );
581
582 pub const FIBO_ACTIVITY_FFQN: FunctionFqn = FunctionFqn::new_static_tuple(
583 test_programs_fibo_activity_builder::exports::testing::fibo::fibo::FIBO,
584 ); pub const FIBO_10_INPUT: u8 = 10;
586 pub const FIBO_10_OUTPUT: u64 = 55;
587
588 fn activity_config(component_id: ComponentId) -> ActivityConfig {
589 ActivityConfig {
590 component_id,
591 forward_stdout: None,
592 forward_stderr: None,
593 env_vars: Arc::from([]),
594
595 fuel: None,
596 allowed_hosts: Arc::from([]),
597 config_section_hint: ConfigSectionHint::ActivityWasm,
598 }
599 }
600
601 pub(crate) fn activity_config_allowed_host(
602 component_id: ComponentId,
603 allowed_host: &str,
604 ) -> ActivityConfig {
605 ActivityConfig {
606 component_id,
607 forward_stdout: None,
608 forward_stderr: None,
609 env_vars: Arc::from([]),
610
611 fuel: None,
612 allowed_hosts: Arc::from(vec![AllowedHostConfig {
613 pattern: HostPattern::parse_with_methods(allowed_host, MethodsPattern::AllMethods)
614 .unwrap(),
615 secret_env_mappings: Vec::new(),
616 replace_in: hashbrown::HashSet::new(),
617 }]),
618 config_section_hint: ConfigSectionHint::ActivityWasm,
619 }
620 }
621
622 pub(crate) async fn new_activity_worker(
623 wasm_path: &str,
624 engine: Arc<Engine>,
625 clock_fn: Box<dyn ClockFn>,
626 sleep: impl Sleep + 'static,
627 ) -> (Arc<dyn Worker>, ComponentId) {
628 new_activity_worker_with_config(wasm_path, engine, clock_fn, sleep, activity_config).await
629 }
630
631 async fn new_activity_worker_with_config(
632 wasm_path: &str,
633 engine: Arc<Engine>,
634 clock_fn: Box<dyn ClockFn>,
635 sleep: impl Sleep + 'static,
636 config_fn: impl FnOnce(ComponentId) -> ActivityConfig,
637 ) -> (Arc<dyn Worker>, ComponentId) {
638 let cancel_registry = CancelRegistry::new();
639 let (wasm_component, component_id) =
640 compile_activity_with_engine(wasm_path, &engine, ComponentType::Activity).await;
641 let (db_forwarder_sender, _) = mpsc::channel(1);
642 (
643 Arc::new(
644 ActivityWorkerCompiled::new_with_config(
645 wasm_component,
646 config_fn(component_id.clone()),
647 engine,
648 clock_fn,
649 Arc::new(sleep),
650 )
651 .unwrap()
652 .into_worker(cancel_registry, &db_forwarder_sender, None),
653 ),
654 component_id,
655 )
656 }
657
658 pub(crate) async fn new_activity(
659 db_pool: Arc<dyn DbPool>,
660 wasm_path: &'static str,
661 clock_fn: Box<dyn ClockFn>,
662 sleep: impl Sleep + 'static,
663 retry_config: ComponentRetryConfig,
664 locking_strategy: LockingStrategy,
665 ) -> ExecTask {
666 new_activity_with_config(
667 db_pool,
668 wasm_path,
669 clock_fn,
670 sleep,
671 activity_config,
672 retry_config,
673 locking_strategy,
674 )
675 .await
676 }
677
678 pub(crate) async fn new_activity_with_config(
679 db_pool: Arc<dyn DbPool>,
680 wasm_path: &'static str,
681 clock_fn: Box<dyn ClockFn>,
682 sleep: impl Sleep + 'static,
683 config_fn: impl FnOnce(ComponentId) -> ActivityConfig,
684 retry_config: ComponentRetryConfig,
685 locking_strategy: LockingStrategy,
686 ) -> ExecTask {
687 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
688 let (worker, component_id) = new_activity_worker_with_config(
689 wasm_path,
690 engine,
691 clock_fn.clone_box(),
692 sleep,
693 config_fn,
694 )
695 .await;
696 let exec_config = ExecConfig {
697 batch_size: 1,
698 lock_expiry: Duration::from_secs(1),
699 tick_sleep: Duration::ZERO,
700 component_id,
701 task_limiter_global: None,
702 task_limiter_local: None,
703 executor_id: ExecutorId::generate(),
704 retry_config,
705 locking_strategy,
706 };
707 ExecTask::new_all_ffqns_test(worker, exec_config, clock_fn, db_pool)
708 }
709
710 pub(crate) async fn new_activity_fibo(
711 db_pool: Arc<dyn DbPool>,
712 clock_fn: Box<dyn ClockFn>,
713 sleep: impl Sleep + 'static,
714 locking_strategy: LockingStrategy,
715 ) -> ExecTask {
716 new_activity(
717 db_pool,
718 test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
719 clock_fn,
720 sleep,
721 ComponentRetryConfig::ZERO,
722 locking_strategy,
723 )
724 .await
725 }
726
727 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
728 pub(crate) enum TestRetryBehavior {
729 SucceedOnRetry,
730 Fail { expected_retry_err: &'static str },
731 }
732
733 pub(crate) async fn run_http_get_retry_test(
740 listener: std::net::TcpListener,
741 worker: Arc<dyn Worker>,
742 ffqn: FunctionFqn,
743 make_params: impl FnOnce(&str) -> Params,
744 locking_strategy: LockingStrategy,
745 expected_err_contains: &str,
746 test_retry_behavior: TestRetryBehavior,
747 ) {
748 use std::ops::Deref;
749 use wiremock::{
750 Mock, MockServer, ResponseTemplate,
751 matchers::{method, path},
752 };
753
754 const BODY: &str = "ok";
755 const RETRY_EXP_BACKOFF: Duration = Duration::from_millis(10);
756
757 let sim_clock = SimClock::default();
758 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
759
760 let server_address = listener
761 .local_addr()
762 .expect("Failed to get server address.");
763 let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
764
765 let retry_config = ComponentRetryConfig {
766 max_retries: Some(1),
767 retry_exp_backoff: RETRY_EXP_BACKOFF,
768 };
769 let exec_config = ExecConfig {
770 batch_size: 1,
771 lock_expiry: Duration::from_secs(1),
772 tick_sleep: Duration::ZERO,
773 component_id: ComponentId::dummy_activity(),
774 task_limiter_global: None,
775 task_limiter_local: None,
776 executor_id: ExecutorId::generate(),
777 retry_config,
778 locking_strategy,
779 };
780 let ffqns = Arc::from([ffqn.clone()]);
781 let exec_task = ExecTask::new_test(
782 exec_config,
783 worker,
784 sim_clock.clone_box(),
785 db_pool.clone(),
786 ffqns,
787 );
788
789 let params = make_params(&uri);
790 let execution_id = ExecutionId::generate();
791 let created_at = sim_clock.now();
792 let db_connection = db_pool.connection_test().await.unwrap();
793 db_connection
794 .create(CreateRequest {
795 created_at,
796 execution_id: execution_id.clone(),
797 ffqn: ffqn.clone(),
798 params,
799 parent: None,
800 metadata: concepts::ExecutionMetadata::empty(),
801 scheduled_at: created_at,
802 component_id: ComponentId::dummy_activity(),
803 deployment_id: DEPLOYMENT_ID_DUMMY,
804 scheduled_by: None,
805 paused: false,
806 })
807 .await
808 .unwrap();
809
810 let server = MockServer::builder().listener(listener).start().await;
811 Mock::given(method("GET"))
812 .and(path("/"))
813 .respond_with(ResponseTemplate::new(500).set_body_string(BODY))
814 .expect(1)
815 .mount(&server)
816 .await;
817 debug!("started mock server on {}", server.address());
818
819 {
820 assert_eq!(
822 1,
823 exec_task
824 .tick_test(sim_clock.now(), RunId::generate())
825 .await
826 .wait_for_tasks()
827 .await
828 .len()
829 );
830 let exec_log = db_connection.get(&execution_id).await.unwrap();
831
832 let (reason, detail, found_expires_at, http_client_traces) = assert_matches!(
833 &exec_log.last_event().event,
834 ExecutionRequest::TemporarilyFailed {
835 backoff_expires_at,
836 reason,
837 detail: Some(detail),
838 http_client_traces: Some(http_client_traces)
839 }
840 => (reason, detail, *backoff_expires_at, http_client_traces)
841 );
842 assert_eq!(sim_clock.now() + RETRY_EXP_BACKOFF, found_expires_at);
843 assert_eq!("activity returned error", reason.deref());
844 assert!(
845 detail.contains(expected_err_contains),
846 "Unexpected detail: {detail}, expected to contain: {expected_err_contains}"
847 );
848
849 assert_eq!(1, http_client_traces.len());
850 let http_client_trace = http_client_traces.iter().next().unwrap();
851 let (method_actual, uri_actual) = assert_matches!(
852 http_client_trace,
853 HttpClientTrace {
854 req: RequestTrace {
855 method,
856 sent_at: _,
857 uri
858 },
859 resp: Some(ResponseTrace {
860 status: Ok(500),
861 finished_at: _
862 })
863 }
864 => (method, uri)
865 );
866 assert_eq!("GET", method_actual);
867 assert_eq!(format!("{uri}/"), *uri_actual);
868 server.verify().await;
869 }
870
871 assert_eq!(
873 0,
874 exec_task
875 .tick_test(sim_clock.now(), RunId::generate())
876 .await
877 .wait_for_tasks()
878 .await
879 .len()
880 );
881 sim_clock.move_time_forward(RETRY_EXP_BACKOFF);
882
883 server.reset().await;
884
885 if test_retry_behavior == TestRetryBehavior::SucceedOnRetry {
886 Mock::given(method("GET"))
888 .and(path("/"))
889 .respond_with(ResponseTemplate::new(200).set_body_string(BODY))
890 .expect(1)
891 .mount(&server)
892 .await;
893 debug!("Reconfigured the server");
894 } assert_eq!(
897 1,
898 exec_task
899 .tick_test(sim_clock.now(), RunId::generate())
900 .await
901 .wait_for_tasks()
902 .await
903 .len()
904 );
905 let exec_log = db_connection.get(&execution_id).await.unwrap();
906 let res = assert_matches!(exec_log.last_event().event.clone(), ExecutionRequest::Finished { retval, .. } => retval);
907 let wast_val_with_type = match test_retry_behavior {
908 TestRetryBehavior::SucceedOnRetry => {
909 let wast_val_with_type = assert_matches!(res, SupportedFunctionReturnValue::Ok(Some(wast_val_with_type)) => wast_val_with_type);
910 let val = assert_matches!(&wast_val_with_type.value, WastVal::String(val) => val);
911 assert_eq!(BODY, val.deref());
912 wast_val_with_type
913 }
914 TestRetryBehavior::Fail { expected_retry_err } => {
915 let wast_val_with_type = assert_matches!(res, SupportedFunctionReturnValue::Err(Some(wast_val_with_type)) => wast_val_with_type);
916 let val = assert_matches!(&wast_val_with_type.value, WastVal::String(val) => val);
917 assert_eq!(expected_retry_err, val.deref());
918 wast_val_with_type
919 }
920 };
921 assert_matches!(wast_val_with_type.r#type, TypeWrapper::String); drop(db_connection);
924 drop(exec_task);
925 db_close.close().await;
926 }
927
928 pub(crate) async fn create_activity_worker_with_allowed_host(
931 wasm_path: &str,
932 listener: &std::net::TcpListener,
933 ) -> Arc<dyn Worker> {
934 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
935 let sim_clock = SimClock::default();
936 let server_address = listener
937 .local_addr()
938 .expect("Failed to get server address.");
939 let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
940
941 let (worker, _) = new_activity_worker_with_config(
942 wasm_path,
943 engine,
944 sim_clock.clone_box(),
945 TokioSleep,
946 {
947 let uri = uri.clone();
948 move |component_id| activity_config_allowed_host(component_id, &uri)
949 },
950 )
951 .await;
952 worker
953 }
954
955 #[rstest]
956 #[tokio::test]
957 async fn fibo_once(
958 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
959 locking_strategy: LockingStrategy,
960 ) {
961 test_utils::set_up();
962 let sim_clock = SimClock::default();
963 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
964 let db_connection = db_pool.connection().await.unwrap();
965 let exec = new_activity_fibo(
966 db_pool.clone(),
967 sim_clock.clone_box(),
968 TokioSleep,
969 locking_strategy,
970 )
971 .await;
972 let execution_id = ExecutionId::generate();
974 let created_at = sim_clock.now();
975 let params = Params::from_json_values_test(vec![json!(FIBO_10_INPUT)]);
976 db_connection
977 .create(CreateRequest {
978 created_at,
979 execution_id: execution_id.clone(),
980 ffqn: FIBO_ACTIVITY_FFQN,
981 params,
982 parent: None,
983 metadata: concepts::ExecutionMetadata::empty(),
984 scheduled_at: created_at,
985 component_id: exec.config.component_id.clone(),
986
987 deployment_id: DEPLOYMENT_ID_DUMMY,
988 scheduled_by: None,
989 paused: false,
990 })
991 .await
992 .unwrap();
993 let executed = exec
995 .tick_test_await(sim_clock.now(), RunId::generate())
996 .await;
997 assert_eq!(vec![execution_id.clone()], executed);
998 let res = db_connection
1000 .wait_for_finished_result(
1001 &execution_id,
1002 Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
1003 )
1004 .await
1005 .unwrap();
1006 let res = assert_matches!(res, SupportedFunctionReturnValue::Ok(ok) => ok);
1007 let fibo = assert_matches!(res,
1008 Some(WastValWithType {value: WastVal::U64(val), r#type: TypeWrapper::U64 }) => val);
1009 assert_eq!(FIBO_10_OUTPUT, fibo);
1010 drop(db_connection);
1011 db_close.close().await;
1012 }
1013
1014 #[tokio::test]
1015 async fn limit_reached() {
1016 const FIBO_INPUT: u8 = 10;
1017 const LOCK_EXPIRY_MILLIS: u64 = 1100;
1018 const TASKS: u32 = 10;
1019 const MAX_INSTANCES: u32 = 1;
1020
1021 test_utils::set_up();
1022 let fibo_input = env_or_default("FIBO_INPUT", FIBO_INPUT);
1023 let lock_expiry =
1024 Duration::from_millis(env_or_default("LOCK_EXPIRY_MILLIS", LOCK_EXPIRY_MILLIS));
1025 let tasks = env_or_default("TASKS", TASKS);
1026 let max_instances = env_or_default("MAX_INSTANCES", MAX_INSTANCES);
1027
1028 let pool_opts = PoolingOptions {
1029 pooling_total_component_instances: Some(max_instances),
1030 pooling_total_stacks: Some(max_instances),
1031 pooling_total_core_instances: Some(max_instances),
1032 pooling_total_memories: Some(max_instances),
1033 pooling_total_tables: Some(max_instances),
1034 ..Default::default()
1035 };
1036
1037 let engine =
1038 Engines::get_activity_engine_test(EngineConfig::pooling_nocache_testing(pool_opts))
1039 .unwrap();
1040
1041 let (fibo_worker, _) = new_activity_worker(
1042 test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
1043 engine,
1044 Now.clone_box(),
1045 TokioSleep,
1046 )
1047 .await;
1048 let join_handles = (0..tasks)
1050 .map(|_| {
1051 let fibo_worker = fibo_worker.clone();
1052 let execution_id = ExecutionId::generate();
1053 let ctx = WorkerContext {
1054 execution_id: execution_id.clone(),
1055 metadata: concepts::ExecutionMetadata::empty(),
1056 component_digest: ComponentId::dummy_activity().component_digest,
1057 ffqn: FIBO_ACTIVITY_FFQN,
1058 params: Params::from_json_values_test(vec![json!(fibo_input)]),
1059 event_history: Vec::new(),
1060 responses: Vec::new(),
1061 parent: None,
1062 version: Version::new(0),
1063 can_be_retried: false,
1064 worker_span: info_span!("worker-test"),
1065 locked_event: Locked {
1066 component_id: ComponentId::dummy_activity(),
1067 executor_id: ExecutorId::generate(),
1068 deployment_id: DEPLOYMENT_ID_DUMMY,
1069 run_id: RunId::generate(),
1070 lock_expires_at: Now.now() + lock_expiry,
1071 retry_config: ComponentRetryConfig::ZERO,
1072 },
1073 executor_close_watcher: tokio::sync::watch::channel(false).1,
1074 };
1075 tokio::spawn(async move { fibo_worker.run(ctx).await })
1076 })
1077 .collect::<Vec<_>>();
1078 let mut limit_reached = 0;
1079 for jh in join_handles {
1080 if matches!(
1081 jh.await.unwrap(),
1082 WorkerResult::Err(WorkerError::LimitReached { .. })
1083 ) {
1084 limit_reached += 1;
1085 }
1086 }
1087 assert!(limit_reached > 0, "Limit was not reached");
1088 }
1089
1090 #[rstest::rstest]
1091 #[case(
1092 10,
1093 100,
1094 SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure{
1095 kind: ExecutionFailureKind::TimedOut,
1096 reason: None, detail: None
1097 })
1098 )] #[case(10, 10, SUPPORTED_RETURN_VALUE_OK_EMPTY)] #[case(
1101 1500,
1102 1,
1103 SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure{
1104 kind: ExecutionFailureKind::TimedOut,
1105 reason: None, detail: None
1106 })
1107 )] #[tokio::test]
1109 async fn sleep_should_produce_temporary_timeout(
1110 #[case] sleep_millis: u32,
1111 #[case] sleep_iterations: u32,
1112 #[case] expected: concepts::SupportedFunctionReturnValue,
1113 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1114 locking_strategy: LockingStrategy,
1115 ) {
1116 const LOCK_EXPIRY: Duration = Duration::from_millis(500);
1117 test_utils::set_up();
1118 let sim_clock = SimClock::default();
1119 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1120 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1121 let (worker, _) = new_activity_worker(
1122 test_programs_sleep_activity_builder::TEST_PROGRAMS_SLEEP_ACTIVITY,
1123 engine,
1124 sim_clock.clone_box(),
1125 TokioSleep,
1126 )
1127 .await;
1128
1129 let exec_config = ExecConfig {
1130 batch_size: 1,
1131 lock_expiry: LOCK_EXPIRY,
1132 tick_sleep: Duration::ZERO,
1133 component_id: ComponentId::dummy_activity(),
1134 task_limiter_global: None,
1135 task_limiter_local: None,
1136 executor_id: ExecutorId::generate(),
1137 retry_config: ComponentRetryConfig::ZERO,
1138 locking_strategy,
1139 };
1140 let ffqns = Arc::from([SLEEP_LOOP_ACTIVITY_FFQN]);
1141 let exec_task = ExecTask::new_test(
1142 exec_config,
1143 worker,
1144 sim_clock.clone_box(),
1145 db_pool.clone(),
1146 ffqns,
1147 );
1148
1149 let execution_id = ExecutionId::generate();
1151 info!("Testing {execution_id}");
1152 let created_at = sim_clock.now();
1153 let db_connection = db_pool.connection().await.unwrap();
1154 db_connection
1155 .create(CreateRequest {
1156 created_at,
1157 execution_id: execution_id.clone(),
1158 ffqn: SLEEP_LOOP_ACTIVITY_FFQN,
1159 params: Params::from_json_values_test(vec![
1160 json!(
1161 {"milliseconds": sleep_millis}),
1162 json!(sleep_iterations),
1163 ]),
1164 parent: None,
1165 metadata: concepts::ExecutionMetadata::empty(),
1166 scheduled_at: created_at,
1167 component_id: ComponentId::dummy_activity(),
1168 deployment_id: DEPLOYMENT_ID_DUMMY,
1169 scheduled_by: None,
1170 paused: false,
1171 })
1172 .await
1173 .unwrap();
1174
1175 assert_eq!(
1177 1,
1178 exec_task
1179 .tick_test(sim_clock.now(), RunId::generate())
1180 .await
1181 .wait_for_tasks()
1182 .await
1183 .len()
1184 );
1185
1186 let exec_log = db_connection.get(&execution_id).await.unwrap();
1188 let retval = assert_matches!(
1189 exec_log.last_event().event.clone(),
1190 ExecutionRequest::Finished { retval, .. } => retval
1191 );
1192 assert_eq!(expected, retval);
1193
1194 drop(exec_task);
1195 db_close.close().await;
1196 }
1197
1198 #[rstest::rstest]
1199 #[case(1, 2_000)] #[case(2_000, 1)] #[tokio::test]
1202 async fn long_running_execution_should_timeout(
1203 #[case] sleep_millis: u64,
1204 #[case] sleep_iterations: u32,
1205 ) {
1206 const TIMEOUT: Duration = Duration::from_millis(200);
1207 test_utils::set_up();
1208
1209 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1210
1211 let sim_clock = SimClock::epoch();
1212 let (worker, _) = new_activity_worker(
1213 test_programs_sleep_activity_builder::TEST_PROGRAMS_SLEEP_ACTIVITY,
1214 engine,
1215 sim_clock.clone_box(),
1216 TokioSleep,
1217 )
1218 .await;
1219
1220 let executed_at = sim_clock.now();
1221 let version = Version::new(10);
1222 let ctx = WorkerContext {
1223 execution_id: ExecutionId::generate(),
1224 metadata: concepts::ExecutionMetadata::empty(),
1225 component_digest: ComponentId::dummy_activity().component_digest,
1226 ffqn: SLEEP_LOOP_ACTIVITY_FFQN,
1227 params: Params::from_json_values_test(vec![
1228 json!(
1229 {"milliseconds": sleep_millis}),
1230 json!(sleep_iterations),
1231 ]),
1232 event_history: Vec::new(),
1233 responses: Vec::new(),
1234 parent: None,
1235 version: version.clone(),
1236 can_be_retried: false,
1237 worker_span: info_span!("worker-test"),
1238 locked_event: Locked {
1239 component_id: ComponentId::dummy_activity(),
1240 executor_id: ExecutorId::generate(),
1241 deployment_id: DEPLOYMENT_ID_DUMMY,
1242 run_id: RunId::generate(),
1243 lock_expires_at: executed_at + TIMEOUT,
1244 retry_config: ComponentRetryConfig::ZERO,
1245 },
1246 executor_close_watcher: tokio::sync::watch::channel(false).1,
1247 };
1248 let WorkerResult::Err(err) = worker.run(ctx).await else {
1249 panic!()
1250 };
1251 let actual_version = assert_matches!(
1252 err,
1253 WorkerError::TemporaryTimeout {
1254 http_client_traces:_,
1255 version
1256 }
1257 => version
1258 );
1259 assert_eq!(version, actual_version);
1260 }
1261
1262 #[tokio::test]
1263 async fn execution_deadline_before_now_should_timeout() {
1264 test_utils::set_up();
1265
1266 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1267 let sim_clock = SimClock::epoch();
1268 let (worker, _) = new_activity_worker(
1269 test_programs_sleep_activity_builder::TEST_PROGRAMS_SLEEP_ACTIVITY,
1270 engine,
1271 sim_clock.clone_box(),
1272 TokioSleep,
1273 )
1274 .await;
1275 let execution_deadline = sim_clock.now();
1277 sim_clock.move_time_forward(Duration::from_millis(100));
1278 let version = Version::new(10);
1279 let ctx = WorkerContext {
1280 execution_id: ExecutionId::generate(),
1281 metadata: concepts::ExecutionMetadata::empty(),
1282 component_digest: ComponentId::dummy_activity().component_digest,
1283 ffqn: SLEEP_LOOP_ACTIVITY_FFQN,
1284 params: Params::from_json_values_test(vec![
1285 json!(
1286 {"milliseconds": 1}),
1287 json!(1),
1288 ]),
1289 event_history: Vec::new(),
1290 responses: Vec::new(),
1291 parent: None,
1292 version: version.clone(),
1293 can_be_retried: false,
1294 worker_span: info_span!("worker-test"),
1295 locked_event: Locked {
1296 component_id: ComponentId::dummy_activity(),
1297 executor_id: ExecutorId::generate(),
1298 deployment_id: DEPLOYMENT_ID_DUMMY,
1299 run_id: RunId::generate(),
1300 lock_expires_at: execution_deadline,
1301 retry_config: ComponentRetryConfig::ZERO,
1302 },
1303 executor_close_watcher: tokio::sync::watch::channel(false).1,
1304 };
1305 let WorkerResult::Err(err) = worker.run(ctx).await else {
1306 panic!()
1307 };
1308 let actual_version = assert_matches!(
1309 err,
1310 WorkerError::TemporaryTimeout {
1311 http_client_traces: None,
1312 version: actual_version,
1313 }
1314 => actual_version
1315 );
1316 assert_eq!(version, actual_version);
1317 }
1318
1319 #[rstest]
1320 #[tokio::test]
1321 async fn http_get_simple(
1322 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1323 locking_strategy: LockingStrategy,
1324 ) {
1325 use std::ops::Deref;
1326 use wiremock::{
1327 Mock, MockServer, ResponseTemplate,
1328 matchers::{method, path},
1329 };
1330 const BODY: &str = "ok";
1331 test_utils::set_up();
1332 info!("All set up");
1333 let sim_clock = SimClock::default();
1334 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1335 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1336
1337 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1338 let server_address = listener
1339 .local_addr()
1340 .expect("Failed to get server address.");
1341 let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
1342
1343 let (worker, _) = new_activity_worker_with_config(
1344 test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1345 engine,
1346 sim_clock.clone_box(),
1347 TokioSleep,
1348 {
1349 let uri = uri.clone();
1350 move |component_id| activity_config_allowed_host(component_id, &uri)
1351 },
1352 )
1353 .await;
1354
1355 let exec_config = ExecConfig {
1356 batch_size: 1,
1357 lock_expiry: Duration::from_secs(1),
1358 tick_sleep: Duration::ZERO,
1359 component_id: ComponentId::dummy_activity(),
1360 task_limiter_global: None,
1361 task_limiter_local: None,
1362 executor_id: ExecutorId::generate(),
1363 retry_config: ComponentRetryConfig::ZERO,
1364 locking_strategy,
1365 };
1366 let ffqns = Arc::from([HTTP_GET_SUCCESSFUL_ACTIVITY]);
1367 let exec_task = ExecTask::new_test(
1368 exec_config,
1369 worker,
1370 sim_clock.clone_box(),
1371 db_pool.clone(),
1372 ffqns,
1373 );
1374
1375 let params = Params::from_json_values_test(vec![json!(uri.clone())]);
1376 let execution_id = ExecutionId::generate();
1377 let created_at = sim_clock.now();
1378 let db_connection = db_pool.connection_test().await.unwrap();
1379 info!("Creating execution");
1380 let stopwatch = std::time::Instant::now();
1381 db_connection
1382 .create(CreateRequest {
1383 created_at,
1384 execution_id: execution_id.clone(),
1385 ffqn: HTTP_GET_SUCCESSFUL_ACTIVITY,
1386 params,
1387 parent: None,
1388 metadata: concepts::ExecutionMetadata::empty(),
1389 scheduled_at: created_at,
1390 component_id: ComponentId::dummy_activity(),
1391
1392 deployment_id: DEPLOYMENT_ID_DUMMY,
1393 scheduled_by: None,
1394 paused: false,
1395 })
1396 .await
1397 .unwrap();
1398
1399 let server = MockServer::builder().listener(listener).start().await;
1400 Mock::given(method("GET"))
1401 .and(path("/"))
1402 .respond_with(ResponseTemplate::new(200).set_body_string(BODY))
1403 .expect(1)
1404 .mount(&server)
1405 .await;
1406
1407 assert_eq!(
1408 1,
1409 exec_task
1410 .tick_test(sim_clock.now(), RunId::generate())
1411 .await
1412 .wait_for_tasks()
1413 .await
1414 .len()
1415 );
1416 let exec_log = db_connection.get(&execution_id).await.unwrap();
1417 let stopwatch = stopwatch.elapsed();
1418 info!("Finished in {stopwatch:?}");
1419 let (res, http_client_traces) = assert_matches!(
1420 exec_log.last_event().event.clone(),
1421 ExecutionRequest::Finished { retval, http_client_traces: Some(http_client_traces) }
1422 => (retval, http_client_traces));
1423 let wast_val_with_type = assert_matches!(res, SupportedFunctionReturnValue::Ok(Some(wast_val_with_type)) => wast_val_with_type);
1424 let val = assert_matches!(wast_val_with_type.value, WastVal::String(val) => val);
1425 assert_eq!(BODY, val.deref());
1426 assert_matches!(wast_val_with_type.r#type, TypeWrapper::String);
1428 assert_eq!(1, http_client_traces.len());
1429 let http_client_trace = http_client_traces.into_iter().next().unwrap();
1430 let (method, uri_actual) = assert_matches!(
1431 http_client_trace,
1432 HttpClientTrace {
1433 req: RequestTrace {
1434 method,
1435 sent_at: _,
1436 uri
1437 },
1438 resp: Some(ResponseTrace {
1439 status: Ok(200),
1440 finished_at: _
1441 })
1442 }
1443 => (method, uri)
1444 );
1445 assert_eq!("GET", method);
1446 assert_eq!(format!("{uri}/"), *uri_actual);
1447 drop(db_connection);
1448 drop(exec_task);
1449 db_close.close().await;
1450 }
1451
1452 #[rstest]
1453 #[tokio::test]
1454 async fn http_get_activity_trap_should_be_turned_into_finished_execution_error_permanent_failure(
1455 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1456 locking_strategy: LockingStrategy,
1457 ) {
1458 use wiremock::{
1459 Mock, MockServer, ResponseTemplate,
1460 matchers::{method, path},
1461 };
1462 const STATUS: u16 = 418; test_utils::set_up();
1464 info!("All set up");
1465 let sim_clock = SimClock::default();
1466 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1467 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1468
1469 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1470 let server_address = listener
1471 .local_addr()
1472 .expect("Failed to get server address.");
1473 let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
1474
1475 let (worker, _) = new_activity_worker_with_config(
1476 test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1477 engine,
1478 sim_clock.clone_box(),
1479 TokioSleep,
1480 {
1481 let uri = uri.clone();
1482 move |component_id| activity_config_allowed_host(component_id, &uri)
1483 },
1484 )
1485 .await;
1486
1487 let exec_config = ExecConfig {
1488 batch_size: 1,
1489 lock_expiry: Duration::from_secs(1),
1490 tick_sleep: Duration::ZERO,
1491 component_id: ComponentId::dummy_activity(),
1492 task_limiter_global: None,
1493 task_limiter_local: None,
1494 executor_id: ExecutorId::generate(),
1495 retry_config: ComponentRetryConfig::ZERO,
1496 locking_strategy,
1497 };
1498 let ffqns = Arc::from([HTTP_GET_SUCCESSFUL_ACTIVITY]);
1499 let exec_task = ExecTask::new_test(
1500 exec_config,
1501 worker,
1502 sim_clock.clone_box(),
1503 db_pool.clone(),
1504 ffqns,
1505 );
1506
1507 let params = Params::from_json_values_test(vec![json!(uri.clone())]);
1508 let execution_id = ExecutionId::generate();
1509 let created_at = sim_clock.now();
1510 let db_connection = db_pool.connection_test().await.unwrap();
1511 info!("Creating execution");
1512 let stopwatch = std::time::Instant::now();
1513 db_connection
1514 .create(CreateRequest {
1515 created_at,
1516 execution_id: execution_id.clone(),
1517 ffqn: HTTP_GET_SUCCESSFUL_ACTIVITY,
1518 params,
1519 parent: None,
1520 metadata: concepts::ExecutionMetadata::empty(),
1521 scheduled_at: created_at,
1522 component_id: ComponentId::dummy_activity(),
1523
1524 deployment_id: DEPLOYMENT_ID_DUMMY,
1525 scheduled_by: None,
1526 paused: false,
1527 })
1528 .await
1529 .unwrap();
1530
1531 let server = MockServer::builder().listener(listener).start().await;
1532 Mock::given(method("GET"))
1533 .and(path("/"))
1534 .respond_with(ResponseTemplate::new(STATUS).set_body_string(""))
1535 .expect(1)
1536 .mount(&server)
1537 .await;
1538
1539 assert_eq!(
1540 1,
1541 exec_task
1542 .tick_test(sim_clock.now(), RunId::generate())
1543 .await
1544 .wait_for_tasks()
1545 .await
1546 .len()
1547 );
1548 let exec_log = db_connection.get(&execution_id).await.unwrap();
1549 let stopwatch = stopwatch.elapsed();
1550 info!("Finished in {stopwatch:?}");
1551 let (res, http_client_traces) = assert_matches!(
1552 exec_log.last_event().event.clone(),
1553 ExecutionRequest::Finished { retval, http_client_traces: Some(http_client_traces) }
1554 => (retval, http_client_traces));
1555 let res = assert_matches!(res, SupportedFunctionReturnValue::ExecutionFailure(err) => err);
1556 let reason = assert_matches!(
1557 res,
1558 FinishedExecutionFailure {
1559 kind: ExecutionFailureKind::Uncategorized,
1560 reason: Some(reason), detail: _
1562 } => reason
1563 );
1564 assert!(reason.starts_with("activity trap"), "{reason}");
1565
1566 assert_eq!(1, http_client_traces.len());
1567 let http_client_trace = http_client_traces.into_iter().next().unwrap();
1568 let (method, uri_actual) = assert_matches!(
1569 http_client_trace,
1570 HttpClientTrace {
1571 req: RequestTrace {
1572 method,
1573 sent_at: _,
1574 uri
1575 },
1576 resp: Some(ResponseTrace {
1577 status: Ok(STATUS),
1578 finished_at: _
1579 })
1580 }
1581 => (method, uri)
1582 );
1583 assert_eq!("GET", method);
1584 assert_eq!(format!("{uri}/"), *uri_actual);
1585 drop(db_connection);
1586 drop(exec_task);
1587 db_close.close().await;
1588 }
1589
1590 #[rstest::rstest]
1591 #[tokio::test]
1592 async fn http_get_retry_on_fallible_err(
1593 #[values(TestRetryBehavior::SucceedOnRetry,TestRetryBehavior::Fail { expected_retry_err: "wrong status code: 404" })]
1594 test_retry_behavior: TestRetryBehavior,
1595 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1596 locking_strategy: LockingStrategy,
1597 ) {
1598 test_utils::set_up();
1599
1600 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1601 let worker = create_activity_worker_with_allowed_host(
1602 test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1603 &listener,
1604 )
1605 .await;
1606
1607 run_http_get_retry_test(
1608 listener,
1609 worker,
1610 HTTP_GET_SUCCESSFUL_ACTIVITY,
1611 |uri| Params::from_json_values_test(vec![json!(uri)]),
1612 locking_strategy,
1613 "wrong status code: 500",
1614 test_retry_behavior,
1615 )
1616 .await;
1617 }
1618
1619 #[tokio::test]
1620 async fn http_get_denied_host() {
1621 use wiremock::{
1622 Mock, MockServer, ResponseTemplate,
1623 matchers::{method, path},
1624 };
1625 test_utils::set_up();
1626 let sim_clock = SimClock::default();
1627 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1628 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1629
1630 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1631 let server_address = listener.local_addr().unwrap();
1632 let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
1633
1634 let (worker, _) = new_activity_worker_with_config(
1636 test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1637 engine,
1638 sim_clock.clone_box(),
1639 TokioSleep,
1640 activity_config, )
1642 .await;
1643
1644 let exec_config = ExecConfig {
1645 batch_size: 1,
1646 lock_expiry: Duration::from_secs(1),
1647 tick_sleep: Duration::ZERO,
1648 component_id: ComponentId::dummy_activity(),
1649 task_limiter_global: None,
1650 task_limiter_local: None,
1651 executor_id: ExecutorId::generate(),
1652 retry_config: ComponentRetryConfig::ZERO,
1653 locking_strategy: LockingStrategy::ByComponentDigest,
1654 };
1655 let ffqns = Arc::from([HTTP_GET_SUCCESSFUL_ACTIVITY]);
1656 let exec_task = ExecTask::new_test(
1657 exec_config,
1658 worker,
1659 sim_clock.clone_box(),
1660 db_pool.clone(),
1661 ffqns,
1662 );
1663
1664 let params = Params::from_json_values_test(vec![json!(uri.clone())]);
1665 let execution_id = ExecutionId::generate();
1666 let created_at = sim_clock.now();
1667 let db_connection = db_pool.connection_test().await.unwrap();
1668 db_connection
1669 .create(CreateRequest {
1670 created_at,
1671 execution_id: execution_id.clone(),
1672 ffqn: HTTP_GET_SUCCESSFUL_ACTIVITY,
1673 params,
1674 parent: None,
1675 metadata: concepts::ExecutionMetadata::empty(),
1676 scheduled_at: created_at,
1677 component_id: ComponentId::dummy_activity(),
1678 deployment_id: DEPLOYMENT_ID_DUMMY,
1679 scheduled_by: None,
1680 paused: false,
1681 })
1682 .await
1683 .unwrap();
1684
1685 let server = MockServer::builder().listener(listener).start().await;
1686 Mock::given(method("GET"))
1687 .and(path("/"))
1688 .respond_with(ResponseTemplate::new(200).set_body_string("should not reach"))
1689 .expect(0) .mount(&server)
1691 .await;
1692
1693 assert_eq!(
1694 1,
1695 exec_task
1696 .tick_test(sim_clock.now(), RunId::generate())
1697 .await
1698 .wait_for_tasks()
1699 .await
1700 .len()
1701 );
1702 let exec_log = db_connection.get(&execution_id).await.unwrap();
1703 let retval = assert_matches!(
1704 exec_log.last_event().event.clone(),
1705 ExecutionRequest::Finished { retval, .. } => retval
1706 );
1707 let err = assert_matches!(retval, SupportedFunctionReturnValue::Err(Some(err)) => err);
1710 let err = assert_matches!(err.value, WastVal::String(err) => err);
1711 assert_eq!("ErrorCode::HttpRequestDenied", err);
1712 server.verify().await;
1714 drop(db_connection);
1715 drop(exec_task);
1716 db_close.close().await;
1717 }
1718
1719 #[rstest]
1720 #[tokio::test]
1721 async fn http_get_with_secret(
1722 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1723 locking_strategy: LockingStrategy,
1724 ) {
1725 use crate::http_request_policy::{AllowedHostConfig, MethodsPattern, ReplacementLocation};
1726 use hashbrown::HashSet;
1727 use secrecy::SecretString;
1728 use wiremock::{
1729 Mock, MockServer, ResponseTemplate,
1730 matchers::{header, method, path, query_param},
1731 };
1732 const SECRET_VALUE: &str = "my-secret-api-key-12345";
1733 const SECRET_ENV_VAR: &str = "TEST_API_KEY";
1734 test_utils::set_up();
1735 let sim_clock = SimClock::default();
1736 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1737 let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1738
1739 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1740 let server_address = listener.local_addr().unwrap();
1741 let allowed_host = format!("http://127.0.0.1:{port}", port = server_address.port());
1742 let host_pattern =
1743 HostPattern::parse_with_methods(&allowed_host, MethodsPattern::AllMethods).unwrap();
1744
1745 let (worker, component_id) = new_activity_worker_with_config(
1747 test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1748 engine,
1749 sim_clock.clone_box(),
1750 TokioSleep,
1751 {
1752 let host_pattern = host_pattern.clone();
1753 move |component_id| ActivityConfig {
1754 component_id,
1755 forward_stdout: None,
1756 forward_stderr: None,
1757 env_vars: Arc::from([]),
1758
1759 fuel: None,
1760 allowed_hosts: Arc::from(vec![AllowedHostConfig {
1761 pattern: host_pattern,
1762 secret_env_mappings: vec![(
1763 SECRET_ENV_VAR.to_string(),
1764 SecretString::from(SECRET_VALUE.to_string()),
1765 )],
1766 replace_in: HashSet::from_iter([
1767 ReplacementLocation::Headers,
1768 ReplacementLocation::Params,
1769 ReplacementLocation::Body,
1770 ]),
1771 }]),
1772 config_section_hint: ConfigSectionHint::ActivityWasm,
1773 }
1774 },
1775 )
1776 .await;
1777
1778 let exec_config = ExecConfig {
1779 batch_size: 1,
1780 lock_expiry: Duration::from_secs(1),
1781 tick_sleep: Duration::ZERO,
1782 component_id: component_id.clone(),
1783 task_limiter_global: None,
1784 task_limiter_local: None,
1785 executor_id: ExecutorId::generate(),
1786 retry_config: ComponentRetryConfig::ZERO,
1787 locking_strategy,
1788 };
1789 let secret_get_ffqn: FunctionFqn =
1790 FunctionFqn::new_static("testing:http/http-get", "secret-get");
1791 let ffqns = Arc::from([secret_get_ffqn.clone()]);
1792 let exec_task = ExecTask::new_test(
1793 exec_config,
1794 worker,
1795 sim_clock.clone_box(),
1796 db_pool.clone(),
1797 ffqns,
1798 );
1799
1800 let url_with_placeholder = format!("{allowed_host}/?secret={SECRET_ENV_VAR}");
1803 let header_with_placeholder =
1804 Some(("X-API-Key".to_string(), format!("Bearer {SECRET_ENV_VAR}")));
1805 let params = Params::from_json_values_test(vec![
1806 json!(url_with_placeholder),
1807 json!(SECRET_ENV_VAR),
1808 json!(header_with_placeholder),
1809 ]);
1810 let execution_id = ExecutionId::generate();
1811 let created_at = sim_clock.now();
1812 let db_connection = db_pool.connection_test().await.unwrap();
1813 db_connection
1814 .create(CreateRequest {
1815 created_at,
1816 execution_id: execution_id.clone(),
1817 ffqn: secret_get_ffqn.clone(),
1818 params,
1819 parent: None,
1820 metadata: concepts::ExecutionMetadata::empty(),
1821 scheduled_at: created_at,
1822 component_id,
1823 deployment_id: DEPLOYMENT_ID_DUMMY,
1824 scheduled_by: None,
1825 paused: false,
1826 })
1827 .await
1828 .unwrap();
1829
1830 let server = MockServer::builder().listener(listener).start().await;
1831 Mock::given(method("GET"))
1833 .and(path("/"))
1834 .and(query_param("secret", SECRET_VALUE))
1835 .and(header("X-API-Key", format!("Bearer {SECRET_VALUE}")))
1836 .respond_with(ResponseTemplate::new(200).set_body_string("secret-received"))
1837 .expect(1)
1838 .mount(&server)
1839 .await;
1840
1841 assert_eq!(
1842 1,
1843 exec_task
1844 .tick_test(sim_clock.now(), RunId::generate())
1845 .await
1846 .wait_for_tasks()
1847 .await
1848 .len()
1849 );
1850 let exec_log = db_connection.get(&execution_id).await.unwrap();
1851 let retval = assert_matches!(
1852 exec_log.last_event().event.clone(),
1853 ExecutionRequest::Finished { retval, .. } => retval
1854 );
1855 assert_matches!(retval, SupportedFunctionReturnValue::Ok(..));
1857 server.verify().await;
1858 drop(db_connection);
1859 drop(exec_task);
1860 db_close.close().await;
1861 }
1862
1863 #[rstest::rstest(
1864 param => [
1865 r#"{"image": "foo", "a": false, "b":false}"#,
1866 r#"{"b": false, "a":false, "image": "foo"}"#,
1867 ])]
1868 #[tokio::test]
1869 async fn record_field_ordering(
1870 param: &str,
1871 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1872 locking_strategy: LockingStrategy,
1873 ) {
1874 test_utils::set_up();
1875 let sim_clock = SimClock::default();
1876 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1877 let db_connection = db_pool.connection().await.unwrap();
1878 let exec = new_activity_with_config(
1879 db_pool.clone(),
1880 test_programs_serde_activity_builder::TEST_PROGRAMS_SERDE_ACTIVITY,
1881 sim_clock.clone_box(),
1882 TokioSleep,
1883 move |component_id| ActivityConfig {
1884 component_id,
1885 forward_stdout: Some(StdOutputConfig::Stderr),
1886 forward_stderr: Some(StdOutputConfig::Stderr),
1887 env_vars: Arc::default(),
1888
1889 fuel: None,
1890 allowed_hosts: Arc::from([]),
1891 config_section_hint: ConfigSectionHint::ActivityWasm,
1892 },
1893 ComponentRetryConfig::ZERO,
1894 locking_strategy,
1895 )
1896 .await;
1897 let ffqn = FunctionFqn::new_static_tuple(
1899 test_programs_serde_activity_builder::exports::testing::serde::serde::REC,
1900 );
1901 let execution_id = ExecutionId::generate();
1902 let created_at = sim_clock.now();
1903 db_connection
1904 .create(CreateRequest {
1905 created_at,
1906 execution_id: execution_id.clone(),
1907 ffqn,
1908 params: Params::from_json_values_test(vec![serde_json::from_str(param).unwrap()]),
1909 parent: None,
1910 metadata: concepts::ExecutionMetadata::empty(),
1911 scheduled_at: created_at,
1912 component_id: exec.config.component_id.clone(),
1913
1914 deployment_id: DEPLOYMENT_ID_DUMMY,
1915 scheduled_by: None,
1916 paused: false,
1917 })
1918 .await
1919 .unwrap();
1920 let executed = exec
1921 .tick_test_await(sim_clock.now(), RunId::generate())
1922 .await;
1923 assert_eq!(vec![execution_id.clone()], executed);
1924 let res = db_connection
1926 .wait_for_finished_result(
1927 &execution_id,
1928 Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
1929 )
1930 .await
1931 .unwrap();
1932 let record = assert_matches!(res, SupportedFunctionReturnValue::Ok(record) => record);
1933 insta::with_settings!({
1934 prepend_module_to_snapshot => false},
1935 {
1936 assert_json_snapshot!(record);
1937 }
1938 );
1939 db_close.close().await;
1940 }
1941
1942 #[rstest]
1943 #[tokio::test]
1944 async fn variant_with_optional_none(
1945 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1946 locking_strategy: LockingStrategy,
1947 ) {
1948 test_utils::set_up();
1949 let sim_clock = SimClock::default();
1950 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1951 let db_connection = db_pool.connection().await.unwrap();
1952 let exec = new_activity_with_config(
1953 db_pool.clone(),
1954 test_programs_serde_activity_builder::TEST_PROGRAMS_SERDE_ACTIVITY,
1955 sim_clock.clone_box(),
1956 TokioSleep,
1957 move |component_id| ActivityConfig {
1958 component_id,
1959 forward_stdout: Some(StdOutputConfig::Stderr),
1960 forward_stderr: Some(StdOutputConfig::Stderr),
1961 env_vars: Arc::default(),
1962
1963 fuel: None,
1964 allowed_hosts: Arc::from([]),
1965 config_section_hint: ConfigSectionHint::ActivityWasm,
1966 },
1967 ComponentRetryConfig::ZERO,
1968 locking_strategy,
1969 )
1970 .await;
1971 let ffqn = FunctionFqn::new_static_tuple(
1973 test_programs_serde_activity_builder::exports::testing::serde::serde::VAR,
1974 );
1975 let execution_id = ExecutionId::generate();
1976 let created_at = sim_clock.now();
1977 db_connection
1978 .create(CreateRequest {
1979 created_at,
1980 execution_id: execution_id.clone(),
1981 ffqn,
1982 params: Params::from_json_values_test(vec![json!({"var1":null})]),
1983 parent: None,
1984 metadata: concepts::ExecutionMetadata::empty(),
1985 scheduled_at: created_at,
1986 component_id: exec.config.component_id.clone(),
1987
1988 deployment_id: DEPLOYMENT_ID_DUMMY,
1989 scheduled_by: None,
1990 paused: false,
1991 })
1992 .await
1993 .unwrap();
1994 let executed = exec
1995 .tick_test_await(sim_clock.now(), RunId::generate())
1996 .await;
1997 assert_eq!(vec![execution_id.clone()], executed);
1998 let res = db_connection
2000 .wait_for_finished_result(
2001 &execution_id,
2002 Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
2003 )
2004 .await
2005 .unwrap();
2006 let variant = assert_matches!(res, SupportedFunctionReturnValue::Ok(variant) => variant);
2007
2008 insta::with_settings!({
2009 prepend_module_to_snapshot => false},
2010 {
2011 assert_json_snapshot!(variant);
2012 }
2013 );
2014 db_close.close().await;
2015 }
2016
2017 #[rstest]
2018 #[tokio::test]
2019 async fn permanent_error_variant_should_not_retry(
2020 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2021 locking_strategy: LockingStrategy,
2022 ) {
2023 test_utils::set_up();
2024 let sim_clock = SimClock::default();
2025 let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
2026 let db_connection = db_pool.connection().await.unwrap();
2027 let retry_config = ComponentRetryConfig {
2029 max_retries: Some(1),
2030 retry_exp_backoff: Duration::from_millis(10),
2031 };
2032 let exec = new_activity_with_config(
2033 db_pool.clone(),
2034 test_programs_serde_activity_builder::TEST_PROGRAMS_SERDE_ACTIVITY,
2035 sim_clock.clone_box(),
2036 TokioSleep,
2037 move |component_id| ActivityConfig {
2038 component_id,
2039 forward_stdout: Some(StdOutputConfig::Stderr),
2040 forward_stderr: Some(StdOutputConfig::Stderr),
2041 env_vars: Arc::default(),
2042
2043 fuel: None,
2044 allowed_hosts: Arc::from([]),
2045 config_section_hint: ConfigSectionHint::ActivityWasm,
2046 },
2047 retry_config,
2048 locking_strategy,
2049 )
2050 .await;
2051 let ffqn = FunctionFqn::new_static_tuple(
2053 test_programs_serde_activity_builder::exports::testing::serde::serde::PERMANENT_ERR,
2054 );
2055 let execution_id = ExecutionId::generate();
2056 let created_at = sim_clock.now();
2057 db_connection
2058 .create(CreateRequest {
2059 created_at,
2060 execution_id: execution_id.clone(),
2061 ffqn,
2062 params: Params::empty(),
2063 parent: None,
2064 metadata: concepts::ExecutionMetadata::empty(),
2065 scheduled_at: created_at,
2066 component_id: exec.config.component_id.clone(),
2067
2068 deployment_id: DEPLOYMENT_ID_DUMMY,
2069 scheduled_by: None,
2070 paused: false,
2071 })
2072 .await
2073 .unwrap();
2074 let executed = exec
2075 .tick_test_await(sim_clock.now(), RunId::generate())
2076 .await;
2077 assert_eq!(vec![execution_id.clone()], executed);
2078 let res = db_connection
2080 .wait_for_finished_result(
2081 &execution_id,
2082 Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
2083 )
2084 .await
2085 .unwrap();
2086 let err = assert_matches!(res, SupportedFunctionReturnValue::Err(err) => err);
2088 let (key, _) = assert_matches!(
2089 err,
2090 Some(WastValWithType {
2091 value: WastVal::Variant(key, payload),
2092 ..
2093 }) => (key, payload)
2094 );
2095 assert_eq!("permanent_failure", key.as_snake_str());
2096 db_close.close().await;
2097 }
2098}