Skip to main content

obeli_sk_wasm_workers/activity/
activity_worker.rs

1use super::activity_ctx::{self, ActivityCtx};
2use crate::activity::cancel_registry::CancelRegistry;
3use crate::component_logger::{LogStrageConfig, log_activities};
4use crate::envvar::EnvVar;
5use crate::http_hooks::ConfigSectionHint;
6use crate::std_output_stream::{StdOutputConfig, StdOutputConfigWithSender};
7use crate::{RunnableComponent, WasmFileError};
8use async_trait::async_trait;
9use chrono::{DateTime, Utc};
10use concepts::storage::http_client_trace::HttpClientTrace;
11use concepts::storage::{LogInfoAppendRow, LogStreamType, Version};
12use concepts::time::{ClockFn, Sleep, now_tokio_instant};
13use concepts::{
14    ComponentId, FunctionFqn, PackageIfcFns, Params, SupportedFunctionReturnValue, TrapKind,
15};
16use concepts::{FunctionMetadata, ResultParsingError};
17use executor::worker::{FatalError, RunFinished, WorkerContext, WorkerResult, WorkerResultOk};
18use executor::worker::{Worker, WorkerError};
19use itertools::Itertools;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::mpsc;
23use tracing::{debug, info, trace};
24use utils::wasm_tools::ExIm;
25use wasmtime::component::{ComponentExportIndex, InstancePre, Type};
26use wasmtime::{Engine, component::Val};
27use wasmtime::{Store, UpdateDeadline};
28
29#[derive(Clone, Debug)]
30pub struct ActivityConfig {
31    pub component_id: ComponentId,
32    pub forward_stdout: Option<StdOutputConfig>,
33    pub forward_stderr: Option<StdOutputConfig>,
34    pub env_vars: Arc<[EnvVar]>,
35    pub fuel: Option<u64>,
36    pub allowed_hosts: Arc<[crate::http_request_policy::AllowedHostConfig]>,
37    /// The TOML config section type for error messages
38    pub config_section_hint: ConfigSectionHint,
39}
40
41#[derive(derive_more::Debug)]
42pub struct ActivityWorkerCompiled {
43    #[debug(skip)]
44    engine: Arc<Engine>,
45    #[debug(skip)]
46    instance_pre: InstancePre<ActivityCtx>,
47    exim: ExIm,
48    #[debug(skip)]
49    clock_fn: Box<dyn ClockFn>,
50    #[debug(skip)]
51    sleep: Arc<dyn Sleep>,
52    exported_ffqn_to_index: hashbrown::HashMap<FunctionFqn, ComponentExportIndex>,
53    config: ActivityConfig,
54}
55impl ActivityWorkerCompiled {
56    pub fn new_with_config(
57        runnable_component: RunnableComponent,
58        config: ActivityConfig,
59        engine: Arc<Engine>,
60        clock_fn: Box<dyn ClockFn>,
61        sleep: Arc<dyn Sleep>,
62    ) -> Result<Self, WasmFileError> {
63        let mut linker = wasmtime::component::Linker::new(&engine);
64        // wasi
65        wasmtime_wasi::p2::add_to_linker_async(&mut linker)
66            .map_err(|err| WasmFileError::linking_error("cannot link wasi", err))?;
67        // wasi-http
68        wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker)
69            .map_err(|err| WasmFileError::linking_error("cannot link wasi-http", err))?;
70        // obelisk:log
71        log_activities::obelisk::log::log::add_to_linker::<_, ActivityCtx>(&mut linker, |x| x)
72            .map_err(|err| WasmFileError::linking_error("cannot link obelisk:log", err))?;
73        // Attempt to pre-instantiate to catch missing imports
74        let instance_pre = linker
75            .instantiate_pre(&runnable_component.wasmtime_component)
76            .map_err(|err| WasmFileError::linking_error("cannot link activity", err))?;
77
78        let exported_ffqn_to_index = RunnableComponent::index_exported_functions(
79            &runnable_component.wasmtime_component,
80            &runnable_component.wasm_component.exim,
81        )
82        .map_err(WasmFileError::DecodeError)?;
83        Ok(Self {
84            engine,
85            exim: runnable_component.wasm_component.exim,
86            clock_fn,
87            sleep,
88            exported_ffqn_to_index,
89            config,
90            instance_pre,
91        })
92    }
93
94    #[must_use]
95    pub fn exported_functions_ext(&self) -> &[FunctionMetadata] {
96        self.exim.get_exports(true)
97    }
98
99    #[must_use]
100    pub fn exports_hierarchy_ext(&self) -> &[PackageIfcFns] {
101        self.exim.get_exports_hierarchy_ext()
102    }
103
104    #[must_use]
105    pub fn imported_functions(&self) -> &[FunctionMetadata] {
106        &self.exim.imports_flat
107    }
108
109    #[must_use]
110    pub fn into_worker(
111        self,
112        cancel_registry: CancelRegistry,
113        log_forwarder_sender: &mpsc::Sender<LogInfoAppendRow>,
114        logs_storage_config: Option<LogStrageConfig>,
115    ) -> ActivityWorker {
116        let stdout = StdOutputConfigWithSender::new(
117            self.config.forward_stdout,
118            log_forwarder_sender,
119            LogStreamType::StdOut,
120        );
121        let stderr = StdOutputConfigWithSender::new(
122            self.config.forward_stderr,
123            log_forwarder_sender,
124            LogStreamType::StdErr,
125        );
126        ActivityWorker {
127            engine: self.engine,
128            instance_pre: self.instance_pre,
129            exim: self.exim,
130            clock_fn: self.clock_fn,
131            sleep: self.sleep,
132            exported_ffqn_to_index: self.exported_ffqn_to_index,
133            config: self.config,
134            cancel_registry,
135            stdout,
136            stderr,
137            logs_storage_config,
138        }
139    }
140}
141
142pub struct ActivityWorker {
143    engine: Arc<Engine>,
144    instance_pre: InstancePre<ActivityCtx>,
145    exim: ExIm,
146    clock_fn: Box<dyn ClockFn>,
147    sleep: Arc<dyn Sleep>,
148    exported_ffqn_to_index: hashbrown::HashMap<FunctionFqn, ComponentExportIndex>,
149    config: ActivityConfig,
150    cancel_registry: CancelRegistry,
151    stdout: Option<StdOutputConfigWithSender>,
152    stderr: Option<StdOutputConfigWithSender>,
153    logs_storage_config: Option<LogStrageConfig>,
154}
155
156impl ActivityWorker {
157    #[must_use]
158    pub fn exported_functions_ext(&self) -> &[FunctionMetadata] {
159        self.exim.get_exports(true)
160    }
161
162    #[must_use]
163    pub fn exports_hierarchy_ext(&self) -> &[PackageIfcFns] {
164        self.exim.get_exports_hierarchy_ext()
165    }
166
167    #[must_use]
168    pub fn imported_functions(&self) -> &[FunctionMetadata] {
169        &self.exim.imports_flat
170    }
171}
172
173#[async_trait]
174impl Worker for ActivityWorker {
175    fn exported_functions_noext(&self) -> &[FunctionMetadata] {
176        self.exim.get_exports(false)
177    }
178
179    async fn run(&self, ctx: WorkerContext) -> WorkerResult {
180        trace!("Context: {ctx:?}");
181        assert!(ctx.event_history.is_empty());
182        let cancelation_token = self
183            .cancel_registry
184            .obtain_cancellation_token(ctx.execution_id.clone());
185
186        let started_at = self.clock_fn.now();
187        ctx.worker_span.record(
188            "execution_deadline",
189            tracing::field::display(&ctx.locked_event.lock_expires_at),
190        );
191
192        let ffqn = ctx.ffqn.clone();
193        let params = ctx.params.clone();
194        let version = ctx.version.clone();
195        let worker_span = ctx.worker_span.clone();
196
197        let (mut store, deadline_duration) = match self.create_store(ctx, started_at) {
198            Ok(store) => store,
199            Err(err) => return WorkerResult::Err(err),
200        };
201
202        let stopwatch_for_reporting = now_tokio_instant(); // Not using `clock_fn` here is ok, value is only used for log reporting.
203
204        let call_function = {
205            let call_func_params = match self
206                .call_func_params(&ffqn, &params, &version, &mut store)
207                .await
208            {
209                Ok(ok) => ok,
210                Err(err) => return WorkerResult::Err(err),
211            };
212            self.call_func(&mut store, call_func_params)
213        };
214
215        tokio::select! { // future's liveness: Dropping the loser immediately.
216            res = call_function => {
217                let activity_ctx = store.into_data();
218                let res = self.process_res(res, &version, activity_ctx);
219                worker_span.in_scope(|| {
220                    match &res {
221                        Ok(worker_res_ok) => {
222                            info!(duration = ?stopwatch_for_reporting.elapsed(), "Run finished: {worker_res_ok}");
223                        }
224                        Err(WorkerError::ExecutorClosing(_)) => {
225                            info!("Executor closing");
226                        }
227                        Err(err) => {
228                            info!(%err, duration = ?stopwatch_for_reporting.elapsed(), "Run finished with an error");
229                        }
230                    }
231                });
232                return res;
233            },
234            ()  = self.sleep.sleep(deadline_duration) => {
235                let activity_ctx = store.into_data();
236                worker_span.in_scope(||
237                        info!(duration = ?stopwatch_for_reporting.elapsed(), %started_at,
238                        now = %self.clock_fn.now(),
239                        "Run timed out")
240                    );
241                let http_client_traces = Some(activity_ctx.http_hooks.http_client_traces
242                    .into_iter()
243                        .map(|(req, mut resp)| HttpClientTrace {
244                            req,
245                            resp: resp.try_recv().ok(),
246                        })
247                        .collect_vec());
248                return WorkerResult::Err(WorkerError::TemporaryTimeout{
249                    http_client_traces,
250                    version,
251                });
252            }
253            _signal = cancelation_token => {
254                // Either paused or cancelled by CancelRegistry, or timed out by `expired_timers_watcher`
255                // and Sender removed from CancelRegistry using its watcher.
256                // TODO: Add http traces
257                debug!("Activity run interrupted, DB must have been updated");
258                return WorkerResult::Ok(WorkerResultOk::DbUpdatedByWorkerOrWatcher);
259            }
260        }
261    }
262}
263
264struct CallFuncParams {
265    func: wasmtime::component::Func,
266    params: Arc<[Val]>,
267    result_type: Type,
268}
269
270impl ActivityWorker {
271    fn create_store(
272        &self,
273        ctx: WorkerContext,
274        started_at: DateTime<Utc>,
275    ) -> Result<(Store<ActivityCtx>, Duration /* deadline duration*/), WorkerError> {
276        let lock_expires_at = ctx.locked_event.lock_expires_at;
277        let worker_span = ctx.worker_span.clone();
278        let version = ctx.version.clone();
279
280        let stdout = self
281            .stdout
282            .as_ref()
283            .map(|it| it.build(&ctx.execution_id, ctx.locked_event.run_id));
284        let stderr = self
285            .stderr
286            .as_ref()
287            .map(|it| it.build(&ctx.execution_id, ctx.locked_event.run_id));
288
289        let mut store = activity_ctx::store(
290            &self.engine,
291            ctx,
292            &self.config,
293            self.clock_fn.clone_box(),
294            stdout,
295            stderr,
296            self.logs_storage_config.clone(),
297        );
298
299        // Set fuel.
300        if let Some(fuel) = self.config.fuel {
301            store
302                .set_fuel(fuel)
303                .expect("engine must have `consume_fuel` enabled");
304        }
305
306        // Configure epoch callback before running the initialization to avoid interruption
307        store.epoch_deadline_callback(|store_ctx| {
308            let executor_closing = *store_ctx.data().executor_close_watcher.borrow();
309            if executor_closing {
310                info!("Executor closing");
311                Ok(UpdateDeadline::Interrupt) // Interpreted as executor closing in `process_res`
312            } else {
313                Ok(UpdateDeadline::YieldCustom(
314                    1,
315                    Box::pin(tokio::task::yield_now()),
316                ))
317            }
318        });
319
320        let deadline_delta = lock_expires_at - started_at;
321        let Ok(deadline_duration) = deadline_delta.to_std() else {
322            worker_span.in_scope(|| {
323                info!(execution_deadline = %lock_expires_at, %started_at,
324                    "Timed out - started_at later than execution_deadline");
325            });
326            return Err(WorkerError::TemporaryTimeout {
327                http_client_traces: None,
328                version,
329            });
330        };
331        worker_span.record(
332            "deadline_duration",
333            tracing::field::debug(&deadline_duration),
334        );
335        Ok((store, deadline_duration))
336    }
337
338    async fn call_func_params(
339        &self,
340        ffqn: &FunctionFqn,
341        params: &Params,
342        version: &Version,
343        store: &mut Store<ActivityCtx>,
344    ) -> Result<CallFuncParams, WorkerError> {
345        let instance = match self.instance_pre.instantiate_async(&mut *store).await {
346            Ok(instance) => instance,
347            Err(err) => {
348                let reason = err.to_string();
349                if reason.starts_with("maximum concurrent") {
350                    return Err(WorkerError::LimitReached {
351                        reason,
352                        version: version.clone(),
353                    });
354                }
355                return Err(WorkerError::FatalError(
356                    FatalError::CannotInstantiate {
357                        reason: format!("cannot instantiate: {err}"),
358                        detail: Some(format!("{err:?}")),
359                    },
360                    version.clone(),
361                ));
362            }
363        };
364        let func = {
365            let fn_export_index = self
366                .exported_ffqn_to_index
367                .get(ffqn)
368                .expect("executor only calls `run` with ffqns that are exported");
369            instance
370                .get_func(&mut *store, fn_export_index)
371                .expect("exported function found with wit-parser but not with wasmtime")
372        };
373
374        let component_func = func.ty(store);
375        let params = match params.as_vals(component_func.params()) {
376            Ok(params) => params,
377            Err(err) => {
378                return Err(WorkerError::FatalError(
379                    FatalError::ParamsParsingError(err),
380                    version.clone(),
381                ));
382            }
383        };
384
385        let result_types = component_func.results().collect::<Vec<_>>(); // TODO: investigate using the iterator directly.
386        assert!(
387            result_types.len() == 1,
388            "multi-value and void results are not supported, must have been checked in function registry"
389        );
390
391        Ok(CallFuncParams {
392            func,
393            params,
394            result_type: result_types
395                .into_iter()
396                .next()
397                .expect("just checked that size == 1"),
398        })
399    }
400
401    async fn call_func(
402        &self,
403        store: &mut Store<ActivityCtx>,
404        CallFuncParams {
405            func,
406            params,
407            result_type,
408        }: CallFuncParams,
409    ) -> Result<Result<SupportedFunctionReturnValue, ResultParsingError>, wasmtime::Error> {
410        let mut results = vec![Val::Bool(false)];
411        let res = func
412            .call_async(&mut *store, &params, &mut results)
413            .await
414            .map(|()| {
415                (
416                    results.into_iter().next().expect("results size is 1"),
417                    result_type,
418                )
419            });
420        res.map(|(val, r#type)| SupportedFunctionReturnValue::new(val, r#type))
421    }
422
423    fn process_res(
424        &self,
425        res: Result<Result<SupportedFunctionReturnValue, ResultParsingError>, wasmtime::Error>,
426        version: &Version,
427        activity_ctx: ActivityCtx,
428    ) -> WorkerResult {
429        let http_client_traces = Some(
430            activity_ctx
431                .http_hooks
432                .http_client_traces
433                .into_iter()
434                .map(|(req, mut resp)| HttpClientTrace {
435                    req,
436                    resp: resp.try_recv().ok(),
437                })
438                .collect_vec(),
439        );
440        match res {
441            Ok(Ok(result)) => WorkerResult::Ok(WorkerResultOk::RunFinished(RunFinished {
442                retval: result,
443                version: version.clone(),
444                http_client_traces,
445            })),
446            Ok(Err(result_parsing_err)) => WorkerResult::Err(WorkerError::FatalError(
447                FatalError::ResultParsingError(result_parsing_err),
448                version.clone(),
449            )),
450            Err(err) => WorkerResult::Err(
451                if let Some(trap) = err
452                    .source()
453                    .and_then(|source| source.downcast_ref::<wasmtime::Trap>())
454                {
455                    if *trap == wasmtime::Trap::OutOfFuel {
456                        WorkerError::ActivityTrap {
457                            reason: format!(
458                                "total fuel consumed: {}",
459                                self.config
460                                    .fuel
461                                    .expect("must have been set as it was the reason of trap")
462                            ),
463                            detail: None,
464                            trap_kind: TrapKind::OutOfFuel,
465                            version: version.clone(),
466                            http_client_traces,
467                        }
468                    } else if *trap == wasmtime::Trap::Interrupt {
469                        WorkerError::ExecutorClosing(version.clone())
470                    } else {
471                        WorkerError::ActivityTrap {
472                            reason: trap.to_string(),
473                            detail: Some(format!("{err:?}")),
474                            trap_kind: TrapKind::Trap,
475                            version: version.clone(),
476                            http_client_traces,
477                        }
478                    }
479                } else {
480                    WorkerError::ActivityTrap {
481                        reason: err.to_string(),
482                        trap_kind: TrapKind::HostFunctionError,
483                        detail: Some(format!("{err:?}")),
484                        version: version.clone(),
485                        http_client_traces,
486                    }
487                },
488            ),
489        }
490    }
491}
492
493#[cfg(any(test, feature = "test"))]
494pub mod test {
495    use concepts::{ComponentId, ComponentType, StrVariant, component_id::ComponentDigest};
496    use utils::sha256sum::calculate_sha256_file;
497    use wasmtime::Engine;
498
499    use crate::{
500        RunnableComponent,
501        engines::{EngineConfig, Engines},
502    };
503
504    pub async fn compile_activity(wasm_path: &str) -> (RunnableComponent, ComponentId) {
505        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
506        compile_activity_with_engine(wasm_path, &engine, ComponentType::Activity).await
507    }
508
509    #[allow(dead_code)] // falsly positive
510    pub(crate) async fn compile_activity_stub(wasm_path: &str) -> (RunnableComponent, ComponentId) {
511        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
512        compile_activity_with_engine(wasm_path, &engine, ComponentType::ActivityStub).await
513    }
514
515    pub(crate) async fn compile_activity_with_engine(
516        wasm_path: &str,
517        engine: &Engine,
518        component_type: ComponentType,
519    ) -> (RunnableComponent, ComponentId) {
520        assert!(component_type.is_activity());
521        let file_digest = calculate_sha256_file(wasm_path).await.unwrap();
522        let component_id = ComponentId::new(
523            component_type,
524            StrVariant::empty(),
525            ComponentDigest(file_digest.0),
526        )
527        .unwrap();
528        (
529            RunnableComponent::new(wasm_path, engine, component_type).unwrap(),
530            component_id,
531        )
532    }
533}
534
535#[cfg(test)]
536pub(crate) mod tests {
537    use super::*;
538    use crate::activity::activity_worker::test::compile_activity_with_engine;
539    use crate::engines::PoolingOptions;
540    use crate::engines::{EngineConfig, Engines};
541    use crate::http_hooks::ConfigSectionHint;
542    use crate::http_request_policy::{AllowedHostConfig, HostPattern, MethodsPattern};
543    use assert_matches::assert_matches;
544    use concepts::prefixed_ulid::{DEPLOYMENT_ID_DUMMY, RunId};
545    use concepts::storage::Locked;
546    use concepts::storage::http_client_trace::{RequestTrace, ResponseTrace};
547    use concepts::storage::{DbPool, TimeoutOutcome};
548    use concepts::storage::{ExecutionRequest, Version};
549    use concepts::time::Now;
550    use concepts::time::TokioSleep;
551    use concepts::{ComponentRetryConfig, ComponentType};
552    use concepts::{
553        ExecutionFailureKind, FinishedExecutionFailure, SUPPORTED_RETURN_VALUE_OK_EMPTY,
554    };
555    use concepts::{
556        ExecutionId, FunctionFqn, Params, SupportedFunctionReturnValue, prefixed_ulid::ExecutorId,
557        storage::CreateRequest, storage::DbPoolCloseable,
558    };
559    use db_tests::Database;
560    use executor::executor::LockingStrategy;
561    use executor::executor::{ExecConfig, ExecTask};
562    use insta::assert_json_snapshot;
563    use rstest::rstest;
564    use serde_json::json;
565    use std::future;
566    use std::time::Duration;
567    use test_utils::env_or_default;
568    use test_utils::sim_clock::SimClock;
569    use tracing::{debug, info, info_span};
570    use val_json::{
571        type_wrapper::TypeWrapper,
572        wast_val::{WastVal, WastValWithType},
573    };
574
575    pub const SLEEP_LOOP_ACTIVITY_FFQN: FunctionFqn = FunctionFqn::new_static_tuple(
576        test_programs_sleep_activity_builder::exports::testing::sleep::sleep::SLEEP_LOOP,
577    ); // sleep-loop: func(millis: u64, iterations: u32);
578    pub const HTTP_GET_SUCCESSFUL_ACTIVITY: FunctionFqn = FunctionFqn::new_static_tuple(
579        test_programs_http_get_activity_builder::exports::testing::http::http_get::GET_SUCCESSFUL,
580    );
581
582    pub const FIBO_ACTIVITY_FFQN: FunctionFqn = FunctionFqn::new_static_tuple(
583        test_programs_fibo_activity_builder::exports::testing::fibo::fibo::FIBO,
584    ); // func(n: u8) -> u64;
585    pub const FIBO_10_INPUT: u8 = 10;
586    pub const FIBO_10_OUTPUT: u64 = 55;
587
588    fn activity_config(component_id: ComponentId) -> ActivityConfig {
589        ActivityConfig {
590            component_id,
591            forward_stdout: None,
592            forward_stderr: None,
593            env_vars: Arc::from([]),
594
595            fuel: None,
596            allowed_hosts: Arc::from([]),
597            config_section_hint: ConfigSectionHint::ActivityWasm,
598        }
599    }
600
601    pub(crate) fn activity_config_allowed_host(
602        component_id: ComponentId,
603        allowed_host: &str,
604    ) -> ActivityConfig {
605        ActivityConfig {
606            component_id,
607            forward_stdout: None,
608            forward_stderr: None,
609            env_vars: Arc::from([]),
610
611            fuel: None,
612            allowed_hosts: Arc::from(vec![AllowedHostConfig {
613                pattern: HostPattern::parse_with_methods(allowed_host, MethodsPattern::AllMethods)
614                    .unwrap(),
615                secret_env_mappings: Vec::new(),
616                replace_in: hashbrown::HashSet::new(),
617            }]),
618            config_section_hint: ConfigSectionHint::ActivityWasm,
619        }
620    }
621
622    pub(crate) async fn new_activity_worker(
623        wasm_path: &str,
624        engine: Arc<Engine>,
625        clock_fn: Box<dyn ClockFn>,
626        sleep: impl Sleep + 'static,
627    ) -> (Arc<dyn Worker>, ComponentId) {
628        new_activity_worker_with_config(wasm_path, engine, clock_fn, sleep, activity_config).await
629    }
630
631    async fn new_activity_worker_with_config(
632        wasm_path: &str,
633        engine: Arc<Engine>,
634        clock_fn: Box<dyn ClockFn>,
635        sleep: impl Sleep + 'static,
636        config_fn: impl FnOnce(ComponentId) -> ActivityConfig,
637    ) -> (Arc<dyn Worker>, ComponentId) {
638        let cancel_registry = CancelRegistry::new();
639        let (wasm_component, component_id) =
640            compile_activity_with_engine(wasm_path, &engine, ComponentType::Activity).await;
641        let (db_forwarder_sender, _) = mpsc::channel(1);
642        (
643            Arc::new(
644                ActivityWorkerCompiled::new_with_config(
645                    wasm_component,
646                    config_fn(component_id.clone()),
647                    engine,
648                    clock_fn,
649                    Arc::new(sleep),
650                )
651                .unwrap()
652                .into_worker(cancel_registry, &db_forwarder_sender, None),
653            ),
654            component_id,
655        )
656    }
657
658    pub(crate) async fn new_activity(
659        db_pool: Arc<dyn DbPool>,
660        wasm_path: &'static str,
661        clock_fn: Box<dyn ClockFn>,
662        sleep: impl Sleep + 'static,
663        retry_config: ComponentRetryConfig,
664        locking_strategy: LockingStrategy,
665    ) -> ExecTask {
666        new_activity_with_config(
667            db_pool,
668            wasm_path,
669            clock_fn,
670            sleep,
671            activity_config,
672            retry_config,
673            locking_strategy,
674        )
675        .await
676    }
677
678    pub(crate) async fn new_activity_with_config(
679        db_pool: Arc<dyn DbPool>,
680        wasm_path: &'static str,
681        clock_fn: Box<dyn ClockFn>,
682        sleep: impl Sleep + 'static,
683        config_fn: impl FnOnce(ComponentId) -> ActivityConfig,
684        retry_config: ComponentRetryConfig,
685        locking_strategy: LockingStrategy,
686    ) -> ExecTask {
687        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
688        let (worker, component_id) = new_activity_worker_with_config(
689            wasm_path,
690            engine,
691            clock_fn.clone_box(),
692            sleep,
693            config_fn,
694        )
695        .await;
696        let exec_config = ExecConfig {
697            batch_size: 1,
698            lock_expiry: Duration::from_secs(1),
699            tick_sleep: Duration::ZERO,
700            component_id,
701            task_limiter_global: None,
702            task_limiter_local: None,
703            executor_id: ExecutorId::generate(),
704            retry_config,
705            locking_strategy,
706        };
707        ExecTask::new_all_ffqns_test(worker, exec_config, clock_fn, db_pool)
708    }
709
710    pub(crate) async fn new_activity_fibo(
711        db_pool: Arc<dyn DbPool>,
712        clock_fn: Box<dyn ClockFn>,
713        sleep: impl Sleep + 'static,
714        locking_strategy: LockingStrategy,
715    ) -> ExecTask {
716        new_activity(
717            db_pool,
718            test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
719            clock_fn,
720            sleep,
721            ComponentRetryConfig::ZERO,
722            locking_strategy,
723        )
724        .await
725    }
726
727    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
728    pub(crate) enum TestRetryBehavior {
729        SucceedOnRetry,
730        Fail { expected_retry_err: &'static str },
731    }
732
733    /// Common test helper for testing HTTP GET retry behavior on fallible errors.
734    ///
735    /// This function tests that:
736    /// 1. An activity that returns an error (from a 500 response) triggers a temporary failure with backoff
737    /// 2. After the backoff expires, the activity is retried
738    /// 3. On retry, either succeeds (if `succeed_eventually` is true) or fails permanently
739    pub(crate) async fn run_http_get_retry_test(
740        listener: std::net::TcpListener,
741        worker: Arc<dyn Worker>,
742        ffqn: FunctionFqn,
743        make_params: impl FnOnce(&str) -> Params,
744        locking_strategy: LockingStrategy,
745        expected_err_contains: &str,
746        test_retry_behavior: TestRetryBehavior,
747    ) {
748        use std::ops::Deref;
749        use wiremock::{
750            Mock, MockServer, ResponseTemplate,
751            matchers::{method, path},
752        };
753
754        const BODY: &str = "ok";
755        const RETRY_EXP_BACKOFF: Duration = Duration::from_millis(10);
756
757        let sim_clock = SimClock::default();
758        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
759
760        let server_address = listener
761            .local_addr()
762            .expect("Failed to get server address.");
763        let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
764
765        let retry_config = ComponentRetryConfig {
766            max_retries: Some(1),
767            retry_exp_backoff: RETRY_EXP_BACKOFF,
768        };
769        let exec_config = ExecConfig {
770            batch_size: 1,
771            lock_expiry: Duration::from_secs(1),
772            tick_sleep: Duration::ZERO,
773            component_id: ComponentId::dummy_activity(),
774            task_limiter_global: None,
775            task_limiter_local: None,
776            executor_id: ExecutorId::generate(),
777            retry_config,
778            locking_strategy,
779        };
780        let ffqns = Arc::from([ffqn.clone()]);
781        let exec_task = ExecTask::new_test(
782            exec_config,
783            worker,
784            sim_clock.clone_box(),
785            db_pool.clone(),
786            ffqns,
787        );
788
789        let params = make_params(&uri);
790        let execution_id = ExecutionId::generate();
791        let created_at = sim_clock.now();
792        let db_connection = db_pool.connection_test().await.unwrap();
793        db_connection
794            .create(CreateRequest {
795                created_at,
796                execution_id: execution_id.clone(),
797                ffqn: ffqn.clone(),
798                params,
799                parent: None,
800                metadata: concepts::ExecutionMetadata::empty(),
801                scheduled_at: created_at,
802                component_id: ComponentId::dummy_activity(),
803                deployment_id: DEPLOYMENT_ID_DUMMY,
804                scheduled_by: None,
805                paused: false,
806            })
807            .await
808            .unwrap();
809
810        let server = MockServer::builder().listener(listener).start().await;
811        Mock::given(method("GET"))
812            .and(path("/"))
813            .respond_with(ResponseTemplate::new(500).set_body_string(BODY))
814            .expect(1)
815            .mount(&server)
816            .await;
817        debug!("started mock server on {}", server.address());
818
819        {
820            // Expect error result to be interpreted as a temporary failure
821            assert_eq!(
822                1,
823                exec_task
824                    .tick_test(sim_clock.now(), RunId::generate())
825                    .await
826                    .wait_for_tasks()
827                    .await
828                    .len()
829            );
830            let exec_log = db_connection.get(&execution_id).await.unwrap();
831
832            let (reason, detail, found_expires_at, http_client_traces) = assert_matches!(
833                &exec_log.last_event().event,
834                ExecutionRequest::TemporarilyFailed {
835                    backoff_expires_at,
836                    reason,
837                    detail: Some(detail),
838                    http_client_traces: Some(http_client_traces)
839                }
840                => (reason, detail, *backoff_expires_at, http_client_traces)
841            );
842            assert_eq!(sim_clock.now() + RETRY_EXP_BACKOFF, found_expires_at);
843            assert_eq!("activity returned error", reason.deref());
844            assert!(
845                detail.contains(expected_err_contains),
846                "Unexpected detail: {detail}, expected to contain: {expected_err_contains}"
847            );
848
849            assert_eq!(1, http_client_traces.len());
850            let http_client_trace = http_client_traces.iter().next().unwrap();
851            let (method_actual, uri_actual) = assert_matches!(
852                http_client_trace,
853                HttpClientTrace {
854                    req: RequestTrace {
855                        method,
856                        sent_at: _,
857                        uri
858                    },
859                    resp: Some(ResponseTrace {
860                        status: Ok(500),
861                        finished_at: _
862                    })
863                }
864                => (method, uri)
865            );
866            assert_eq!("GET", method_actual);
867            assert_eq!(format!("{uri}/"), *uri_actual);
868            server.verify().await;
869        }
870
871        // Noop until the timeout expires
872        assert_eq!(
873            0,
874            exec_task
875                .tick_test(sim_clock.now(), RunId::generate())
876                .await
877                .wait_for_tasks()
878                .await
879                .len()
880        );
881        sim_clock.move_time_forward(RETRY_EXP_BACKOFF);
882
883        server.reset().await;
884
885        if test_retry_behavior == TestRetryBehavior::SucceedOnRetry {
886            // Reconfigure the server, return 200
887            Mock::given(method("GET"))
888                .and(path("/"))
889                .respond_with(ResponseTemplate::new(200).set_body_string(BODY))
890                .expect(1)
891                .mount(&server)
892                .await;
893            debug!("Reconfigured the server");
894        } // otherwise return 404
895
896        assert_eq!(
897            1,
898            exec_task
899                .tick_test(sim_clock.now(), RunId::generate())
900                .await
901                .wait_for_tasks()
902                .await
903                .len()
904        );
905        let exec_log = db_connection.get(&execution_id).await.unwrap();
906        let res = assert_matches!(exec_log.last_event().event.clone(), ExecutionRequest::Finished { retval, .. } => retval);
907        let wast_val_with_type = match test_retry_behavior {
908            TestRetryBehavior::SucceedOnRetry => {
909                let wast_val_with_type = assert_matches!(res, SupportedFunctionReturnValue::Ok(Some(wast_val_with_type)) => wast_val_with_type);
910                let val = assert_matches!(&wast_val_with_type.value, WastVal::String(val) => val);
911                assert_eq!(BODY, val.deref());
912                wast_val_with_type
913            }
914            TestRetryBehavior::Fail { expected_retry_err } => {
915                let wast_val_with_type = assert_matches!(res, SupportedFunctionReturnValue::Err(Some(wast_val_with_type)) => wast_val_with_type);
916                let val = assert_matches!(&wast_val_with_type.value, WastVal::String(val) => val);
917                assert_eq!(expected_retry_err, val.deref());
918                wast_val_with_type
919            }
920        };
921        // check types
922        assert_matches!(wast_val_with_type.r#type, TypeWrapper::String); // in both cases
923        drop(db_connection);
924        drop(exec_task);
925        db_close.close().await;
926    }
927
928    /// Creates an activity worker with allowed host configuration.
929    /// Returns the worker and the URI for the allowed host.
930    pub(crate) async fn create_activity_worker_with_allowed_host(
931        wasm_path: &str,
932        listener: &std::net::TcpListener,
933    ) -> Arc<dyn Worker> {
934        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
935        let sim_clock = SimClock::default();
936        let server_address = listener
937            .local_addr()
938            .expect("Failed to get server address.");
939        let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
940
941        let (worker, _) = new_activity_worker_with_config(
942            wasm_path,
943            engine,
944            sim_clock.clone_box(),
945            TokioSleep,
946            {
947                let uri = uri.clone();
948                move |component_id| activity_config_allowed_host(component_id, &uri)
949            },
950        )
951        .await;
952        worker
953    }
954
955    #[rstest]
956    #[tokio::test]
957    async fn fibo_once(
958        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
959        locking_strategy: LockingStrategy,
960    ) {
961        test_utils::set_up();
962        let sim_clock = SimClock::default();
963        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
964        let db_connection = db_pool.connection().await.unwrap();
965        let exec = new_activity_fibo(
966            db_pool.clone(),
967            sim_clock.clone_box(),
968            TokioSleep,
969            locking_strategy,
970        )
971        .await;
972        // Create an execution.
973        let execution_id = ExecutionId::generate();
974        let created_at = sim_clock.now();
975        let params = Params::from_json_values_test(vec![json!(FIBO_10_INPUT)]);
976        db_connection
977            .create(CreateRequest {
978                created_at,
979                execution_id: execution_id.clone(),
980                ffqn: FIBO_ACTIVITY_FFQN,
981                params,
982                parent: None,
983                metadata: concepts::ExecutionMetadata::empty(),
984                scheduled_at: created_at,
985                component_id: exec.config.component_id.clone(),
986
987                deployment_id: DEPLOYMENT_ID_DUMMY,
988                scheduled_by: None,
989                paused: false,
990            })
991            .await
992            .unwrap();
993        // tick
994        let executed = exec
995            .tick_test_await(sim_clock.now(), RunId::generate())
996            .await;
997        assert_eq!(vec![execution_id.clone()], executed);
998        // Check the result.
999        let res = db_connection
1000            .wait_for_finished_result(
1001                &execution_id,
1002                Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
1003            )
1004            .await
1005            .unwrap();
1006        let res = assert_matches!(res, SupportedFunctionReturnValue::Ok(ok) => ok);
1007        let fibo = assert_matches!(res,
1008            Some(WastValWithType {value: WastVal::U64(val), r#type: TypeWrapper::U64 }) => val);
1009        assert_eq!(FIBO_10_OUTPUT, fibo);
1010        drop(db_connection);
1011        db_close.close().await;
1012    }
1013
1014    #[tokio::test]
1015    async fn limit_reached() {
1016        const FIBO_INPUT: u8 = 10;
1017        const LOCK_EXPIRY_MILLIS: u64 = 1100;
1018        const TASKS: u32 = 10;
1019        const MAX_INSTANCES: u32 = 1;
1020
1021        test_utils::set_up();
1022        let fibo_input = env_or_default("FIBO_INPUT", FIBO_INPUT);
1023        let lock_expiry =
1024            Duration::from_millis(env_or_default("LOCK_EXPIRY_MILLIS", LOCK_EXPIRY_MILLIS));
1025        let tasks = env_or_default("TASKS", TASKS);
1026        let max_instances = env_or_default("MAX_INSTANCES", MAX_INSTANCES);
1027
1028        let pool_opts = PoolingOptions {
1029            pooling_total_component_instances: Some(max_instances),
1030            pooling_total_stacks: Some(max_instances),
1031            pooling_total_core_instances: Some(max_instances),
1032            pooling_total_memories: Some(max_instances),
1033            pooling_total_tables: Some(max_instances),
1034            ..Default::default()
1035        };
1036
1037        let engine =
1038            Engines::get_activity_engine_test(EngineConfig::pooling_nocache_testing(pool_opts))
1039                .unwrap();
1040
1041        let (fibo_worker, _) = new_activity_worker(
1042            test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
1043            engine,
1044            Now.clone_box(),
1045            TokioSleep,
1046        )
1047        .await;
1048        // create executions
1049        let join_handles = (0..tasks)
1050            .map(|_| {
1051                let fibo_worker = fibo_worker.clone();
1052                let execution_id = ExecutionId::generate();
1053                let ctx = WorkerContext {
1054                    execution_id: execution_id.clone(),
1055                    metadata: concepts::ExecutionMetadata::empty(),
1056                    component_digest: ComponentId::dummy_activity().component_digest,
1057                    ffqn: FIBO_ACTIVITY_FFQN,
1058                    params: Params::from_json_values_test(vec![json!(fibo_input)]),
1059                    event_history: Vec::new(),
1060                    responses: Vec::new(),
1061                    parent: None,
1062                    version: Version::new(0),
1063                    can_be_retried: false,
1064                    worker_span: info_span!("worker-test"),
1065                    locked_event: Locked {
1066                        component_id: ComponentId::dummy_activity(),
1067                        executor_id: ExecutorId::generate(),
1068                        deployment_id: DEPLOYMENT_ID_DUMMY,
1069                        run_id: RunId::generate(),
1070                        lock_expires_at: Now.now() + lock_expiry,
1071                        retry_config: ComponentRetryConfig::ZERO,
1072                    },
1073                    executor_close_watcher: tokio::sync::watch::channel(false).1,
1074                };
1075                tokio::spawn(async move { fibo_worker.run(ctx).await })
1076            })
1077            .collect::<Vec<_>>();
1078        let mut limit_reached = 0;
1079        for jh in join_handles {
1080            if matches!(
1081                jh.await.unwrap(),
1082                WorkerResult::Err(WorkerError::LimitReached { .. })
1083            ) {
1084                limit_reached += 1;
1085            }
1086        }
1087        assert!(limit_reached > 0, "Limit was not reached");
1088    }
1089
1090    #[rstest::rstest]
1091    #[case(
1092            10,
1093            100,
1094            SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure{
1095                kind: ExecutionFailureKind::TimedOut,
1096                reason: None, detail: None
1097            })
1098        )] // 1s -> timeout
1099    #[case(10, 10, SUPPORTED_RETURN_VALUE_OK_EMPTY)] // 0.1s -> Ok
1100    #[case(
1101            1500,
1102            1,
1103            SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure{
1104                kind: ExecutionFailureKind::TimedOut,
1105                reason: None, detail: None
1106            })
1107        )] // 1s -> timeout
1108    #[tokio::test]
1109    async fn sleep_should_produce_temporary_timeout(
1110        #[case] sleep_millis: u32,
1111        #[case] sleep_iterations: u32,
1112        #[case] expected: concepts::SupportedFunctionReturnValue,
1113        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1114        locking_strategy: LockingStrategy,
1115    ) {
1116        const LOCK_EXPIRY: Duration = Duration::from_millis(500);
1117        test_utils::set_up();
1118        let sim_clock = SimClock::default();
1119        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1120        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1121        let (worker, _) = new_activity_worker(
1122            test_programs_sleep_activity_builder::TEST_PROGRAMS_SLEEP_ACTIVITY,
1123            engine,
1124            sim_clock.clone_box(),
1125            TokioSleep,
1126        )
1127        .await;
1128
1129        let exec_config = ExecConfig {
1130            batch_size: 1,
1131            lock_expiry: LOCK_EXPIRY,
1132            tick_sleep: Duration::ZERO,
1133            component_id: ComponentId::dummy_activity(),
1134            task_limiter_global: None,
1135            task_limiter_local: None,
1136            executor_id: ExecutorId::generate(),
1137            retry_config: ComponentRetryConfig::ZERO,
1138            locking_strategy,
1139        };
1140        let ffqns = Arc::from([SLEEP_LOOP_ACTIVITY_FFQN]);
1141        let exec_task = ExecTask::new_test(
1142            exec_config,
1143            worker,
1144            sim_clock.clone_box(),
1145            db_pool.clone(),
1146            ffqns,
1147        );
1148
1149        // Create an execution.
1150        let execution_id = ExecutionId::generate();
1151        info!("Testing {execution_id}");
1152        let created_at = sim_clock.now();
1153        let db_connection = db_pool.connection().await.unwrap();
1154        db_connection
1155            .create(CreateRequest {
1156                created_at,
1157                execution_id: execution_id.clone(),
1158                ffqn: SLEEP_LOOP_ACTIVITY_FFQN,
1159                params: Params::from_json_values_test(vec![
1160                    json!(
1161                        {"milliseconds": sleep_millis}),
1162                    json!(sleep_iterations),
1163                ]),
1164                parent: None,
1165                metadata: concepts::ExecutionMetadata::empty(),
1166                scheduled_at: created_at,
1167                component_id: ComponentId::dummy_activity(),
1168                deployment_id: DEPLOYMENT_ID_DUMMY,
1169                scheduled_by: None,
1170                paused: false,
1171            })
1172            .await
1173            .unwrap();
1174
1175        // Run the execution via tick.
1176        assert_eq!(
1177            1,
1178            exec_task
1179                .tick_test(sim_clock.now(), RunId::generate())
1180                .await
1181                .wait_for_tasks()
1182                .await
1183                .len()
1184        );
1185
1186        // Check the result.
1187        let exec_log = db_connection.get(&execution_id).await.unwrap();
1188        let retval = assert_matches!(
1189            exec_log.last_event().event.clone(),
1190            ExecutionRequest::Finished { retval, .. } => retval
1191        );
1192        assert_eq!(expected, retval);
1193
1194        drop(exec_task);
1195        db_close.close().await;
1196    }
1197
1198    #[rstest::rstest]
1199    #[case(1, 2_000)] // 1ms * 2000 iterations
1200    #[case(2_000, 1)] // 2s * 1 iteration
1201    #[tokio::test]
1202    async fn long_running_execution_should_timeout(
1203        #[case] sleep_millis: u64,
1204        #[case] sleep_iterations: u32,
1205    ) {
1206        const TIMEOUT: Duration = Duration::from_millis(200);
1207        test_utils::set_up();
1208
1209        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1210
1211        let sim_clock = SimClock::epoch();
1212        let (worker, _) = new_activity_worker(
1213            test_programs_sleep_activity_builder::TEST_PROGRAMS_SLEEP_ACTIVITY,
1214            engine,
1215            sim_clock.clone_box(),
1216            TokioSleep,
1217        )
1218        .await;
1219
1220        let executed_at = sim_clock.now();
1221        let version = Version::new(10);
1222        let ctx = WorkerContext {
1223            execution_id: ExecutionId::generate(),
1224            metadata: concepts::ExecutionMetadata::empty(),
1225            component_digest: ComponentId::dummy_activity().component_digest,
1226            ffqn: SLEEP_LOOP_ACTIVITY_FFQN,
1227            params: Params::from_json_values_test(vec![
1228                json!(
1229                    {"milliseconds": sleep_millis}),
1230                json!(sleep_iterations),
1231            ]),
1232            event_history: Vec::new(),
1233            responses: Vec::new(),
1234            parent: None,
1235            version: version.clone(),
1236            can_be_retried: false,
1237            worker_span: info_span!("worker-test"),
1238            locked_event: Locked {
1239                component_id: ComponentId::dummy_activity(),
1240                executor_id: ExecutorId::generate(),
1241                deployment_id: DEPLOYMENT_ID_DUMMY,
1242                run_id: RunId::generate(),
1243                lock_expires_at: executed_at + TIMEOUT,
1244                retry_config: ComponentRetryConfig::ZERO,
1245            },
1246            executor_close_watcher: tokio::sync::watch::channel(false).1,
1247        };
1248        let WorkerResult::Err(err) = worker.run(ctx).await else {
1249            panic!()
1250        };
1251        let actual_version = assert_matches!(
1252            err,
1253            WorkerError::TemporaryTimeout {
1254                http_client_traces:_,
1255                version
1256            }
1257            => version
1258        );
1259        assert_eq!(version, actual_version);
1260    }
1261
1262    #[tokio::test]
1263    async fn execution_deadline_before_now_should_timeout() {
1264        test_utils::set_up();
1265
1266        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1267        let sim_clock = SimClock::epoch();
1268        let (worker, _) = new_activity_worker(
1269            test_programs_sleep_activity_builder::TEST_PROGRAMS_SLEEP_ACTIVITY,
1270            engine,
1271            sim_clock.clone_box(),
1272            TokioSleep,
1273        )
1274        .await;
1275        // simulate a scheduling problem where deadline < now
1276        let execution_deadline = sim_clock.now();
1277        sim_clock.move_time_forward(Duration::from_millis(100));
1278        let version = Version::new(10);
1279        let ctx = WorkerContext {
1280            execution_id: ExecutionId::generate(),
1281            metadata: concepts::ExecutionMetadata::empty(),
1282            component_digest: ComponentId::dummy_activity().component_digest,
1283            ffqn: SLEEP_LOOP_ACTIVITY_FFQN,
1284            params: Params::from_json_values_test(vec![
1285                json!(
1286                    {"milliseconds": 1}),
1287                json!(1),
1288            ]),
1289            event_history: Vec::new(),
1290            responses: Vec::new(),
1291            parent: None,
1292            version: version.clone(),
1293            can_be_retried: false,
1294            worker_span: info_span!("worker-test"),
1295            locked_event: Locked {
1296                component_id: ComponentId::dummy_activity(),
1297                executor_id: ExecutorId::generate(),
1298                deployment_id: DEPLOYMENT_ID_DUMMY,
1299                run_id: RunId::generate(),
1300                lock_expires_at: execution_deadline,
1301                retry_config: ComponentRetryConfig::ZERO,
1302            },
1303            executor_close_watcher: tokio::sync::watch::channel(false).1,
1304        };
1305        let WorkerResult::Err(err) = worker.run(ctx).await else {
1306            panic!()
1307        };
1308        let actual_version = assert_matches!(
1309            err,
1310            WorkerError::TemporaryTimeout {
1311                http_client_traces: None,
1312                version: actual_version,
1313            }
1314            => actual_version
1315        );
1316        assert_eq!(version, actual_version);
1317    }
1318
1319    #[rstest]
1320    #[tokio::test]
1321    async fn http_get_simple(
1322        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1323        locking_strategy: LockingStrategy,
1324    ) {
1325        use std::ops::Deref;
1326        use wiremock::{
1327            Mock, MockServer, ResponseTemplate,
1328            matchers::{method, path},
1329        };
1330        const BODY: &str = "ok";
1331        test_utils::set_up();
1332        info!("All set up");
1333        let sim_clock = SimClock::default();
1334        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1335        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1336
1337        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1338        let server_address = listener
1339            .local_addr()
1340            .expect("Failed to get server address.");
1341        let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
1342
1343        let (worker, _) = new_activity_worker_with_config(
1344            test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1345            engine,
1346            sim_clock.clone_box(),
1347            TokioSleep,
1348            {
1349                let uri = uri.clone();
1350                move |component_id| activity_config_allowed_host(component_id, &uri)
1351            },
1352        )
1353        .await;
1354
1355        let exec_config = ExecConfig {
1356            batch_size: 1,
1357            lock_expiry: Duration::from_secs(1),
1358            tick_sleep: Duration::ZERO,
1359            component_id: ComponentId::dummy_activity(),
1360            task_limiter_global: None,
1361            task_limiter_local: None,
1362            executor_id: ExecutorId::generate(),
1363            retry_config: ComponentRetryConfig::ZERO,
1364            locking_strategy,
1365        };
1366        let ffqns = Arc::from([HTTP_GET_SUCCESSFUL_ACTIVITY]);
1367        let exec_task = ExecTask::new_test(
1368            exec_config,
1369            worker,
1370            sim_clock.clone_box(),
1371            db_pool.clone(),
1372            ffqns,
1373        );
1374
1375        let params = Params::from_json_values_test(vec![json!(uri.clone())]);
1376        let execution_id = ExecutionId::generate();
1377        let created_at = sim_clock.now();
1378        let db_connection = db_pool.connection_test().await.unwrap();
1379        info!("Creating execution");
1380        let stopwatch = std::time::Instant::now();
1381        db_connection
1382            .create(CreateRequest {
1383                created_at,
1384                execution_id: execution_id.clone(),
1385                ffqn: HTTP_GET_SUCCESSFUL_ACTIVITY,
1386                params,
1387                parent: None,
1388                metadata: concepts::ExecutionMetadata::empty(),
1389                scheduled_at: created_at,
1390                component_id: ComponentId::dummy_activity(),
1391
1392                deployment_id: DEPLOYMENT_ID_DUMMY,
1393                scheduled_by: None,
1394                paused: false,
1395            })
1396            .await
1397            .unwrap();
1398
1399        let server = MockServer::builder().listener(listener).start().await;
1400        Mock::given(method("GET"))
1401            .and(path("/"))
1402            .respond_with(ResponseTemplate::new(200).set_body_string(BODY))
1403            .expect(1)
1404            .mount(&server)
1405            .await;
1406
1407        assert_eq!(
1408            1,
1409            exec_task
1410                .tick_test(sim_clock.now(), RunId::generate())
1411                .await
1412                .wait_for_tasks()
1413                .await
1414                .len()
1415        );
1416        let exec_log = db_connection.get(&execution_id).await.unwrap();
1417        let stopwatch = stopwatch.elapsed();
1418        info!("Finished in {stopwatch:?}");
1419        let (res, http_client_traces) = assert_matches!(
1420                exec_log.last_event().event.clone(),
1421                ExecutionRequest::Finished { retval, http_client_traces: Some(http_client_traces) }
1422                => (retval, http_client_traces));
1423        let wast_val_with_type = assert_matches!(res, SupportedFunctionReturnValue::Ok(Some(wast_val_with_type)) => wast_val_with_type);
1424        let val = assert_matches!(wast_val_with_type.value, WastVal::String(val) => val);
1425        assert_eq!(BODY, val.deref());
1426        // check types
1427        assert_matches!(wast_val_with_type.r#type, TypeWrapper::String);
1428        assert_eq!(1, http_client_traces.len());
1429        let http_client_trace = http_client_traces.into_iter().next().unwrap();
1430        let (method, uri_actual) = assert_matches!(
1431            http_client_trace,
1432            HttpClientTrace {
1433                req: RequestTrace {
1434                    method,
1435                    sent_at: _,
1436                    uri
1437                },
1438                resp: Some(ResponseTrace {
1439                    status: Ok(200),
1440                    finished_at: _
1441                })
1442            }
1443            => (method, uri)
1444        );
1445        assert_eq!("GET", method);
1446        assert_eq!(format!("{uri}/"), *uri_actual);
1447        drop(db_connection);
1448        drop(exec_task);
1449        db_close.close().await;
1450    }
1451
1452    #[rstest]
1453    #[tokio::test]
1454    async fn http_get_activity_trap_should_be_turned_into_finished_execution_error_permanent_failure(
1455        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1456        locking_strategy: LockingStrategy,
1457    ) {
1458        use wiremock::{
1459            Mock, MockServer, ResponseTemplate,
1460            matchers::{method, path},
1461        };
1462        const STATUS: u16 = 418; // I'm a teapot causes trap
1463        test_utils::set_up();
1464        info!("All set up");
1465        let sim_clock = SimClock::default();
1466        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1467        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1468
1469        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1470        let server_address = listener
1471            .local_addr()
1472            .expect("Failed to get server address.");
1473        let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
1474
1475        let (worker, _) = new_activity_worker_with_config(
1476            test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1477            engine,
1478            sim_clock.clone_box(),
1479            TokioSleep,
1480            {
1481                let uri = uri.clone();
1482                move |component_id| activity_config_allowed_host(component_id, &uri)
1483            },
1484        )
1485        .await;
1486
1487        let exec_config = ExecConfig {
1488            batch_size: 1,
1489            lock_expiry: Duration::from_secs(1),
1490            tick_sleep: Duration::ZERO,
1491            component_id: ComponentId::dummy_activity(),
1492            task_limiter_global: None,
1493            task_limiter_local: None,
1494            executor_id: ExecutorId::generate(),
1495            retry_config: ComponentRetryConfig::ZERO,
1496            locking_strategy,
1497        };
1498        let ffqns = Arc::from([HTTP_GET_SUCCESSFUL_ACTIVITY]);
1499        let exec_task = ExecTask::new_test(
1500            exec_config,
1501            worker,
1502            sim_clock.clone_box(),
1503            db_pool.clone(),
1504            ffqns,
1505        );
1506
1507        let params = Params::from_json_values_test(vec![json!(uri.clone())]);
1508        let execution_id = ExecutionId::generate();
1509        let created_at = sim_clock.now();
1510        let db_connection = db_pool.connection_test().await.unwrap();
1511        info!("Creating execution");
1512        let stopwatch = std::time::Instant::now();
1513        db_connection
1514            .create(CreateRequest {
1515                created_at,
1516                execution_id: execution_id.clone(),
1517                ffqn: HTTP_GET_SUCCESSFUL_ACTIVITY,
1518                params,
1519                parent: None,
1520                metadata: concepts::ExecutionMetadata::empty(),
1521                scheduled_at: created_at,
1522                component_id: ComponentId::dummy_activity(),
1523
1524                deployment_id: DEPLOYMENT_ID_DUMMY,
1525                scheduled_by: None,
1526                paused: false,
1527            })
1528            .await
1529            .unwrap();
1530
1531        let server = MockServer::builder().listener(listener).start().await;
1532        Mock::given(method("GET"))
1533            .and(path("/"))
1534            .respond_with(ResponseTemplate::new(STATUS).set_body_string(""))
1535            .expect(1)
1536            .mount(&server)
1537            .await;
1538
1539        assert_eq!(
1540            1,
1541            exec_task
1542                .tick_test(sim_clock.now(), RunId::generate())
1543                .await
1544                .wait_for_tasks()
1545                .await
1546                .len()
1547        );
1548        let exec_log = db_connection.get(&execution_id).await.unwrap();
1549        let stopwatch = stopwatch.elapsed();
1550        info!("Finished in {stopwatch:?}");
1551        let (res, http_client_traces) = assert_matches!(
1552                exec_log.last_event().event.clone(),
1553                ExecutionRequest::Finished { retval, http_client_traces: Some(http_client_traces) }
1554                => (retval, http_client_traces));
1555        let res = assert_matches!(res, SupportedFunctionReturnValue::ExecutionFailure(err) => err);
1556        let reason = assert_matches!(
1557            res,
1558            FinishedExecutionFailure {
1559                kind: ExecutionFailureKind::Uncategorized,
1560                reason: Some(reason), // activity trap
1561                detail: _
1562            } => reason
1563        );
1564        assert!(reason.starts_with("activity trap"), "{reason}");
1565
1566        assert_eq!(1, http_client_traces.len());
1567        let http_client_trace = http_client_traces.into_iter().next().unwrap();
1568        let (method, uri_actual) = assert_matches!(
1569            http_client_trace,
1570            HttpClientTrace {
1571                req: RequestTrace {
1572                    method,
1573                    sent_at: _,
1574                    uri
1575                },
1576                resp: Some(ResponseTrace {
1577                    status: Ok(STATUS),
1578                    finished_at: _
1579                })
1580            }
1581            => (method, uri)
1582        );
1583        assert_eq!("GET", method);
1584        assert_eq!(format!("{uri}/"), *uri_actual);
1585        drop(db_connection);
1586        drop(exec_task);
1587        db_close.close().await;
1588    }
1589
1590    #[rstest::rstest]
1591    #[tokio::test]
1592    async fn http_get_retry_on_fallible_err(
1593        #[values(TestRetryBehavior::SucceedOnRetry,TestRetryBehavior::Fail { expected_retry_err: "wrong status code: 404" })]
1594        test_retry_behavior: TestRetryBehavior,
1595        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1596        locking_strategy: LockingStrategy,
1597    ) {
1598        test_utils::set_up();
1599
1600        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1601        let worker = create_activity_worker_with_allowed_host(
1602            test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1603            &listener,
1604        )
1605        .await;
1606
1607        run_http_get_retry_test(
1608            listener,
1609            worker,
1610            HTTP_GET_SUCCESSFUL_ACTIVITY,
1611            |uri| Params::from_json_values_test(vec![json!(uri)]),
1612            locking_strategy,
1613            "wrong status code: 500",
1614            test_retry_behavior,
1615        )
1616        .await;
1617    }
1618
1619    #[tokio::test]
1620    async fn http_get_denied_host() {
1621        use wiremock::{
1622            Mock, MockServer, ResponseTemplate,
1623            matchers::{method, path},
1624        };
1625        test_utils::set_up();
1626        let sim_clock = SimClock::default();
1627        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1628        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1629
1630        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1631        let server_address = listener.local_addr().unwrap();
1632        let uri = format!("http://127.0.0.1:{port}", port = server_address.port());
1633
1634        // Create worker with NO allowed hosts - the request should be denied
1635        let (worker, _) = new_activity_worker_with_config(
1636            test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1637            engine,
1638            sim_clock.clone_box(),
1639            TokioSleep,
1640            activity_config, // no allowed hosts
1641        )
1642        .await;
1643
1644        let exec_config = ExecConfig {
1645            batch_size: 1,
1646            lock_expiry: Duration::from_secs(1),
1647            tick_sleep: Duration::ZERO,
1648            component_id: ComponentId::dummy_activity(),
1649            task_limiter_global: None,
1650            task_limiter_local: None,
1651            executor_id: ExecutorId::generate(),
1652            retry_config: ComponentRetryConfig::ZERO,
1653            locking_strategy: LockingStrategy::ByComponentDigest,
1654        };
1655        let ffqns = Arc::from([HTTP_GET_SUCCESSFUL_ACTIVITY]);
1656        let exec_task = ExecTask::new_test(
1657            exec_config,
1658            worker,
1659            sim_clock.clone_box(),
1660            db_pool.clone(),
1661            ffqns,
1662        );
1663
1664        let params = Params::from_json_values_test(vec![json!(uri.clone())]);
1665        let execution_id = ExecutionId::generate();
1666        let created_at = sim_clock.now();
1667        let db_connection = db_pool.connection_test().await.unwrap();
1668        db_connection
1669            .create(CreateRequest {
1670                created_at,
1671                execution_id: execution_id.clone(),
1672                ffqn: HTTP_GET_SUCCESSFUL_ACTIVITY,
1673                params,
1674                parent: None,
1675                metadata: concepts::ExecutionMetadata::empty(),
1676                scheduled_at: created_at,
1677                component_id: ComponentId::dummy_activity(),
1678                deployment_id: DEPLOYMENT_ID_DUMMY,
1679                scheduled_by: None,
1680                paused: false,
1681            })
1682            .await
1683            .unwrap();
1684
1685        let server = MockServer::builder().listener(listener).start().await;
1686        Mock::given(method("GET"))
1687            .and(path("/"))
1688            .respond_with(ResponseTemplate::new(200).set_body_string("should not reach"))
1689            .expect(0) // Should NOT be called since host is denied
1690            .mount(&server)
1691            .await;
1692
1693        assert_eq!(
1694            1,
1695            exec_task
1696                .tick_test(sim_clock.now(), RunId::generate())
1697                .await
1698                .wait_for_tasks()
1699                .await
1700                .len()
1701        );
1702        let exec_log = db_connection.get(&execution_id).await.unwrap();
1703        let retval = assert_matches!(
1704            exec_log.last_event().event.clone(),
1705            ExecutionRequest::Finished { retval, .. } => retval
1706        );
1707        // The execution should fail with an ExecutionFailure when the WASM traps.
1708        // The trap happens because the HTTP request is denied and the WASM unwraps the error.
1709        let err = assert_matches!(retval, SupportedFunctionReturnValue::Err(Some(err)) => err);
1710        let err = assert_matches!(err.value, WastVal::String(err) => err);
1711        assert_eq!("ErrorCode::HttpRequestDenied", err);
1712        // Verify the mock server was not called (request was blocked before reaching it)
1713        server.verify().await;
1714        drop(db_connection);
1715        drop(exec_task);
1716        db_close.close().await;
1717    }
1718
1719    #[rstest]
1720    #[tokio::test]
1721    async fn http_get_with_secret(
1722        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1723        locking_strategy: LockingStrategy,
1724    ) {
1725        use crate::http_request_policy::{AllowedHostConfig, MethodsPattern, ReplacementLocation};
1726        use hashbrown::HashSet;
1727        use secrecy::SecretString;
1728        use wiremock::{
1729            Mock, MockServer, ResponseTemplate,
1730            matchers::{header, method, path, query_param},
1731        };
1732        const SECRET_VALUE: &str = "my-secret-api-key-12345";
1733        const SECRET_ENV_VAR: &str = "TEST_API_KEY";
1734        test_utils::set_up();
1735        let sim_clock = SimClock::default();
1736        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1737        let engine = Engines::get_activity_engine_test(EngineConfig::on_demand_testing()).unwrap();
1738
1739        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1740        let server_address = listener.local_addr().unwrap();
1741        let allowed_host = format!("http://127.0.0.1:{port}", port = server_address.port());
1742        let host_pattern =
1743            HostPattern::parse_with_methods(&allowed_host, MethodsPattern::AllMethods).unwrap();
1744
1745        // Create worker with secret configuration
1746        let (worker, component_id) = new_activity_worker_with_config(
1747            test_programs_http_get_activity_builder::TEST_PROGRAMS_HTTP_GET_ACTIVITY,
1748            engine,
1749            sim_clock.clone_box(),
1750            TokioSleep,
1751            {
1752                let host_pattern = host_pattern.clone();
1753                move |component_id| ActivityConfig {
1754                    component_id,
1755                    forward_stdout: None,
1756                    forward_stderr: None,
1757                    env_vars: Arc::from([]),
1758
1759                    fuel: None,
1760                    allowed_hosts: Arc::from(vec![AllowedHostConfig {
1761                        pattern: host_pattern,
1762                        secret_env_mappings: vec![(
1763                            SECRET_ENV_VAR.to_string(),
1764                            SecretString::from(SECRET_VALUE.to_string()),
1765                        )],
1766                        replace_in: HashSet::from_iter([
1767                            ReplacementLocation::Headers,
1768                            ReplacementLocation::Params,
1769                            ReplacementLocation::Body,
1770                        ]),
1771                    }]),
1772                    config_section_hint: ConfigSectionHint::ActivityWasm,
1773                }
1774            },
1775        )
1776        .await;
1777
1778        let exec_config = ExecConfig {
1779            batch_size: 1,
1780            lock_expiry: Duration::from_secs(1),
1781            tick_sleep: Duration::ZERO,
1782            component_id: component_id.clone(),
1783            task_limiter_global: None,
1784            task_limiter_local: None,
1785            executor_id: ExecutorId::generate(),
1786            retry_config: ComponentRetryConfig::ZERO,
1787            locking_strategy,
1788        };
1789        let secret_get_ffqn: FunctionFqn =
1790            FunctionFqn::new_static("testing:http/http-get", "secret-get");
1791        let ffqns = Arc::from([secret_get_ffqn.clone()]);
1792        let exec_task = ExecTask::new_test(
1793            exec_config,
1794            worker,
1795            sim_clock.clone_box(),
1796            db_pool.clone(),
1797            ffqns,
1798        );
1799
1800        // secret-get takes: url, env_var, header (optional)
1801        // The url contains the placeholder which gets replaced with the secret
1802        let url_with_placeholder = format!("{allowed_host}/?secret={SECRET_ENV_VAR}");
1803        let header_with_placeholder =
1804            Some(("X-API-Key".to_string(), format!("Bearer {SECRET_ENV_VAR}")));
1805        let params = Params::from_json_values_test(vec![
1806            json!(url_with_placeholder),
1807            json!(SECRET_ENV_VAR),
1808            json!(header_with_placeholder),
1809        ]);
1810        let execution_id = ExecutionId::generate();
1811        let created_at = sim_clock.now();
1812        let db_connection = db_pool.connection_test().await.unwrap();
1813        db_connection
1814            .create(CreateRequest {
1815                created_at,
1816                execution_id: execution_id.clone(),
1817                ffqn: secret_get_ffqn.clone(),
1818                params,
1819                parent: None,
1820                metadata: concepts::ExecutionMetadata::empty(),
1821                scheduled_at: created_at,
1822                component_id,
1823                deployment_id: DEPLOYMENT_ID_DUMMY,
1824                scheduled_by: None,
1825                paused: false,
1826            })
1827            .await
1828            .unwrap();
1829
1830        let server = MockServer::builder().listener(listener).start().await;
1831        // Verify the secret was replaced in both query params and headers
1832        Mock::given(method("GET"))
1833            .and(path("/"))
1834            .and(query_param("secret", SECRET_VALUE))
1835            .and(header("X-API-Key", format!("Bearer {SECRET_VALUE}")))
1836            .respond_with(ResponseTemplate::new(200).set_body_string("secret-received"))
1837            .expect(1)
1838            .mount(&server)
1839            .await;
1840
1841        assert_eq!(
1842            1,
1843            exec_task
1844                .tick_test(sim_clock.now(), RunId::generate())
1845                .await
1846                .wait_for_tasks()
1847                .await
1848                .len()
1849        );
1850        let exec_log = db_connection.get(&execution_id).await.unwrap();
1851        let retval = assert_matches!(
1852            exec_log.last_event().event.clone(),
1853            ExecutionRequest::Finished { retval, .. } => retval
1854        );
1855        // Should succeed with the secret replaced
1856        assert_matches!(retval, SupportedFunctionReturnValue::Ok(..));
1857        server.verify().await;
1858        drop(db_connection);
1859        drop(exec_task);
1860        db_close.close().await;
1861    }
1862
1863    #[rstest::rstest(
1864            param => [
1865                r#"{"image": "foo", "a": false, "b":false}"#,
1866                r#"{"b": false, "a":false, "image": "foo"}"#,
1867                ])]
1868    #[tokio::test]
1869    async fn record_field_ordering(
1870        param: &str,
1871        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1872        locking_strategy: LockingStrategy,
1873    ) {
1874        test_utils::set_up();
1875        let sim_clock = SimClock::default();
1876        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1877        let db_connection = db_pool.connection().await.unwrap();
1878        let exec = new_activity_with_config(
1879            db_pool.clone(),
1880            test_programs_serde_activity_builder::TEST_PROGRAMS_SERDE_ACTIVITY,
1881            sim_clock.clone_box(),
1882            TokioSleep,
1883            move |component_id| ActivityConfig {
1884                component_id,
1885                forward_stdout: Some(StdOutputConfig::Stderr),
1886                forward_stderr: Some(StdOutputConfig::Stderr),
1887                env_vars: Arc::default(),
1888
1889                fuel: None,
1890                allowed_hosts: Arc::from([]),
1891                config_section_hint: ConfigSectionHint::ActivityWasm,
1892            },
1893            ComponentRetryConfig::ZERO,
1894            locking_strategy,
1895        )
1896        .await;
1897        // Create an execution.
1898        let ffqn = FunctionFqn::new_static_tuple(
1899            test_programs_serde_activity_builder::exports::testing::serde::serde::REC,
1900        );
1901        let execution_id = ExecutionId::generate();
1902        let created_at = sim_clock.now();
1903        db_connection
1904            .create(CreateRequest {
1905                created_at,
1906                execution_id: execution_id.clone(),
1907                ffqn,
1908                params: Params::from_json_values_test(vec![serde_json::from_str(param).unwrap()]),
1909                parent: None,
1910                metadata: concepts::ExecutionMetadata::empty(),
1911                scheduled_at: created_at,
1912                component_id: exec.config.component_id.clone(),
1913
1914                deployment_id: DEPLOYMENT_ID_DUMMY,
1915                scheduled_by: None,
1916                paused: false,
1917            })
1918            .await
1919            .unwrap();
1920        let executed = exec
1921            .tick_test_await(sim_clock.now(), RunId::generate())
1922            .await;
1923        assert_eq!(vec![execution_id.clone()], executed);
1924        // Check the result.
1925        let res = db_connection
1926            .wait_for_finished_result(
1927                &execution_id,
1928                Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
1929            )
1930            .await
1931            .unwrap();
1932        let record = assert_matches!(res, SupportedFunctionReturnValue::Ok(record) => record);
1933        insta::with_settings!({
1934            prepend_module_to_snapshot => false},
1935            {
1936                assert_json_snapshot!(record);
1937            }
1938        );
1939        db_close.close().await;
1940    }
1941
1942    #[rstest]
1943    #[tokio::test]
1944    async fn variant_with_optional_none(
1945        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
1946        locking_strategy: LockingStrategy,
1947    ) {
1948        test_utils::set_up();
1949        let sim_clock = SimClock::default();
1950        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
1951        let db_connection = db_pool.connection().await.unwrap();
1952        let exec = new_activity_with_config(
1953            db_pool.clone(),
1954            test_programs_serde_activity_builder::TEST_PROGRAMS_SERDE_ACTIVITY,
1955            sim_clock.clone_box(),
1956            TokioSleep,
1957            move |component_id| ActivityConfig {
1958                component_id,
1959                forward_stdout: Some(StdOutputConfig::Stderr),
1960                forward_stderr: Some(StdOutputConfig::Stderr),
1961                env_vars: Arc::default(),
1962
1963                fuel: None,
1964                allowed_hosts: Arc::from([]),
1965                config_section_hint: ConfigSectionHint::ActivityWasm,
1966            },
1967            ComponentRetryConfig::ZERO,
1968            locking_strategy,
1969        )
1970        .await;
1971        // Create an execution.
1972        let ffqn = FunctionFqn::new_static_tuple(
1973            test_programs_serde_activity_builder::exports::testing::serde::serde::VAR,
1974        );
1975        let execution_id = ExecutionId::generate();
1976        let created_at = sim_clock.now();
1977        db_connection
1978            .create(CreateRequest {
1979                created_at,
1980                execution_id: execution_id.clone(),
1981                ffqn,
1982                params: Params::from_json_values_test(vec![json!({"var1":null})]),
1983                parent: None,
1984                metadata: concepts::ExecutionMetadata::empty(),
1985                scheduled_at: created_at,
1986                component_id: exec.config.component_id.clone(),
1987
1988                deployment_id: DEPLOYMENT_ID_DUMMY,
1989                scheduled_by: None,
1990                paused: false,
1991            })
1992            .await
1993            .unwrap();
1994        let executed = exec
1995            .tick_test_await(sim_clock.now(), RunId::generate())
1996            .await;
1997        assert_eq!(vec![execution_id.clone()], executed);
1998        // Check the result.
1999        let res = db_connection
2000            .wait_for_finished_result(
2001                &execution_id,
2002                Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
2003            )
2004            .await
2005            .unwrap();
2006        let variant = assert_matches!(res, SupportedFunctionReturnValue::Ok(variant) => variant);
2007
2008        insta::with_settings!({
2009            prepend_module_to_snapshot => false},
2010            {
2011                assert_json_snapshot!(variant);
2012            }
2013        );
2014        db_close.close().await;
2015    }
2016
2017    #[rstest]
2018    #[tokio::test]
2019    async fn permanent_error_variant_should_not_retry(
2020        #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2021        locking_strategy: LockingStrategy,
2022    ) {
2023        test_utils::set_up();
2024        let sim_clock = SimClock::default();
2025        let (_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
2026        let db_connection = db_pool.connection().await.unwrap();
2027        // max_retries > 0, so it would retry if error wasn't permanent
2028        let retry_config = ComponentRetryConfig {
2029            max_retries: Some(1),
2030            retry_exp_backoff: Duration::from_millis(10),
2031        };
2032        let exec = new_activity_with_config(
2033            db_pool.clone(),
2034            test_programs_serde_activity_builder::TEST_PROGRAMS_SERDE_ACTIVITY,
2035            sim_clock.clone_box(),
2036            TokioSleep,
2037            move |component_id| ActivityConfig {
2038                component_id,
2039                forward_stdout: Some(StdOutputConfig::Stderr),
2040                forward_stderr: Some(StdOutputConfig::Stderr),
2041                env_vars: Arc::default(),
2042
2043                fuel: None,
2044                allowed_hosts: Arc::from([]),
2045                config_section_hint: ConfigSectionHint::ActivityWasm,
2046            },
2047            retry_config,
2048            locking_strategy,
2049        )
2050        .await;
2051        // Create an execution.
2052        let ffqn = FunctionFqn::new_static_tuple(
2053            test_programs_serde_activity_builder::exports::testing::serde::serde::PERMANENT_ERR,
2054        );
2055        let execution_id = ExecutionId::generate();
2056        let created_at = sim_clock.now();
2057        db_connection
2058            .create(CreateRequest {
2059                created_at,
2060                execution_id: execution_id.clone(),
2061                ffqn,
2062                params: Params::empty(),
2063                parent: None,
2064                metadata: concepts::ExecutionMetadata::empty(),
2065                scheduled_at: created_at,
2066                component_id: exec.config.component_id.clone(),
2067
2068                deployment_id: DEPLOYMENT_ID_DUMMY,
2069                scheduled_by: None,
2070                paused: false,
2071            })
2072            .await
2073            .unwrap();
2074        let executed = exec
2075            .tick_test_await(sim_clock.now(), RunId::generate())
2076            .await;
2077        assert_eq!(vec![execution_id.clone()], executed);
2078        // Check the result - should be Finished with Err, not TemporarilyFailed
2079        let res = db_connection
2080            .wait_for_finished_result(
2081                &execution_id,
2082                Some(Box::pin(future::ready(TimeoutOutcome::Cancel))),
2083            )
2084            .await
2085            .unwrap();
2086        // The permanent-failure variant should prevent retry and finish with Err
2087        let err = assert_matches!(res, SupportedFunctionReturnValue::Err(err) => err);
2088        let (key, _) = assert_matches!(
2089            err,
2090            Some(WastValWithType {
2091                value: WastVal::Variant(key, payload),
2092                ..
2093            }) => (key, payload)
2094        );
2095        assert_eq!("permanent_failure", key.as_snake_str());
2096        db_close.close().await;
2097    }
2098}