Skip to main content

obeli_sk_wasm_workers/webhook/
webhook_trigger.rs

1use crate::component_logger::{ComponentLogger, LogStrageConfig, log_activities};
2use crate::envvar::EnvVar;
3use crate::http_hooks::{HttpClientTracesContainer, HttpHooks};
4use crate::std_output_stream::{LogStream, StdOutput, StdOutputConfig, StdOutputConfigWithSender};
5use crate::webhook::webhook_registry::WebhookStateWatcher;
6use crate::webhook::webhook_trigger::types::obelisk::types::join_set::JoinNextError;
7use crate::webhook::webhook_trigger::types::{
8    GetErrorTrappable, GetStatusErrorTrappable, ScheduleJsonErrorTrappable, TryGetErrorTrappable,
9};
10use crate::workflow::host_exports::{SUFFIX_FN_SCHEDULE, history_event_schedule_at_from_wast_val};
11use crate::{RunnableComponent, WasmFileError};
12use assert_matches::assert_matches;
13use concepts::prefixed_ulid::{
14    DeploymentId, ExecutionIdDerived, ExecutionIdTopLevel, JOIN_SET_START_IDX, RunId,
15};
16use concepts::storage::{
17    AppendRequest, BacktraceInfo, CreateRequest, DbConnection, DbErrorGeneric, DbErrorRead,
18    DbErrorReadWithTimeout, DbErrorWrite, DbPool, ExecutionRequest, HistoryEvent,
19    HistoryEventScheduleAt, JoinSetRequest, LogInfoAppendRow, LogLevel, LogStreamType,
20    PendingState, PendingStateFinishedError, PendingStateFinishedResultKind, TimeoutOutcome,
21    Version, http_client_trace::HttpClientTrace,
22};
23use concepts::time::{ClockFn, Sleep};
24use concepts::{
25    ComponentId, ExecutionFailureKind, ExecutionId, ExecutionMetadata, FinishedExecutionFailure,
26    FunctionFqn, FunctionMetadata, FunctionRegistry, IfcFqnName, JoinSetKind, Params, ReturnType,
27    SUFFIX_PKG_SCHEDULE, SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, TrapKind,
28};
29use concepts::{JoinSetId, SupportedFunctionReturnValue};
30use http_body_util::combinators::UnsyncBoxBody;
31use hyper::body::Bytes;
32use hyper::server::conn::http1;
33use hyper::{Method, StatusCode, Uri};
34use hyper_util::rt::TokioIo;
35use log_activities::obelisk::log::log::Host;
36use route_recognizer::{Match, Router};
37use std::collections::HashMap;
38use std::ops::Deref;
39use std::pin::Pin;
40use std::str::FromStr;
41use std::time::Duration;
42use std::{fmt::Debug, sync::Arc};
43use tokio::net::TcpListener;
44use tokio::select;
45use tokio::sync::{OwnedSemaphorePermit, mpsc, watch};
46use tracing::{
47    Instrument, Span, debug, debug_span, error, info, info_span, instrument, trace, warn,
48};
49use types::obelisk::types::execution::Host as ExecutionHost;
50use types::obelisk::types::join_set::HostJoinSet;
51use types::obelisk::webhook::webhook_support::Host as WebhookSupportHost;
52use val_json::wast_val::WastVal;
53use wasmtime::component::ResourceTable;
54use wasmtime::component::types::ComponentFunc;
55use wasmtime::component::{Linker, Val};
56use wasmtime::{Engine, Store, UpdateDeadline};
57use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
58use wasmtime_wasi_http::WasiHttpCtx;
59use wasmtime_wasi_http::p2::bindings::ProxyPre;
60use wasmtime_wasi_http::p2::bindings::http::types::Scheme;
61use wasmtime_wasi_http::p2::body::HyperOutgoingBody;
62use wasmtime_wasi_http::p2::{WasiHttpCtxView, WasiHttpView};
63use wasmtime_wasi_io::IoView;
64
65const HTTP_HANDLER_FFQN: FunctionFqn =
66    FunctionFqn::new_static("wasi:http/incoming-handler", "handle");
67
68pub(crate) mod types {
69    wasmtime::component::bindgen!({
70        path: "host-wit-webhook/",
71        inline: "package any:any;
72                world bindings {
73                    import obelisk:types/time@4.2.0;
74                    import obelisk:types/execution@4.2.0;
75                    import obelisk:types/backtrace@4.2.0;
76                    import obelisk:types/join-set@4.2.0;
77                    import obelisk:webhook/webhook-support@5.2.0;
78                }",
79        world: "any:any/bindings",
80        imports: {
81            // Make webhook-support functions async and trappable for infrastructure errors
82            "obelisk:webhook/webhook-support": async | trappable,
83        },
84        with: {
85            "obelisk:types/join-set.join-set": concepts::JoinSetId,
86        },
87        trappable_error_type: {
88            "obelisk:types/execution.schedule-json-error" => crate::webhook::webhook_trigger::types::ScheduleJsonErrorTrappable,
89            "obelisk:webhook/webhook-support.get-error" => crate::webhook::webhook_trigger::types::GetErrorTrappable,
90            "obelisk:webhook/webhook-support.get-status-error" => crate::webhook::webhook_trigger::types::GetStatusErrorTrappable,
91            "obelisk:webhook/webhook-support.try-get-error" => crate::webhook::webhook_trigger::types::TryGetErrorTrappable,
92        },
93    });
94
95    /// Trappable wrapper for `ScheduleJsonError` - user errors vs infrastructure failures.
96    #[derive(Debug, thiserror::Error)]
97    pub enum ScheduleJsonErrorTrappable {
98        #[error(transparent)]
99        Normal(#[from] obelisk::types::execution::ScheduleJsonError),
100        #[error(transparent)]
101        Trap(#[from] wasmtime::Error),
102    }
103
104    /// Trappable wrapper for `GetError` - user errors vs infrastructure failures.
105    #[derive(Debug, thiserror::Error)]
106    pub enum GetErrorTrappable {
107        #[error(transparent)]
108        Normal(#[from] obelisk::webhook::webhook_support::GetError),
109        #[error(transparent)]
110        Trap(#[from] wasmtime::Error),
111    }
112
113    /// Trappable wrapper for `GetStatusError` - user errors vs infrastructure failures.
114    #[derive(Debug, thiserror::Error)]
115    pub enum GetStatusErrorTrappable {
116        #[error(transparent)]
117        Normal(#[from] obelisk::webhook::webhook_support::GetStatusError),
118        #[error(transparent)]
119        Trap(#[from] wasmtime::Error),
120    }
121
122    /// Trappable wrapper for `TryGetError` - user errors vs infrastructure failures.
123    #[derive(Debug, thiserror::Error)]
124    pub enum TryGetErrorTrappable {
125        #[error(transparent)]
126        Normal(#[from] obelisk::webhook::webhook_support::TryGetError),
127        #[error(transparent)]
128        Trap(#[from] wasmtime::Error),
129    }
130}
131
132// Conversions from webhook types to internal types
133/// Convert `SupportedFunctionReturnValue` to the JSON result format expected by webhook-support.
134/// Returns Ok(Some(json)) for successful result with value,
135/// Ok(None) for successful result with no value,
136/// Err(Some(json)) for error result with value,
137/// Err(None) for error result with no value.
138fn supported_return_value_to_json_result(
139    retval: &SupportedFunctionReturnValue,
140) -> Result<Option<String>, Option<String>> {
141    match retval {
142        SupportedFunctionReturnValue::Ok(Some(val_with_type)) => {
143            let json =
144                serde_json::to_string(&val_with_type.value).unwrap_or_else(|_| "null".to_string());
145            Ok(Some(json))
146        }
147        SupportedFunctionReturnValue::Ok(None) => Ok(None),
148        SupportedFunctionReturnValue::Err(Some(val_with_type)) => {
149            let json =
150                serde_json::to_string(&val_with_type.value).unwrap_or_else(|_| "null".to_string());
151            Err(Some(json))
152        }
153        SupportedFunctionReturnValue::Err(None) => Err(None),
154        SupportedFunctionReturnValue::ExecutionFailure(err) => {
155            Err(Some(format!("execution error: {err}")))
156        }
157    }
158}
159
160fn schedule_at_from_webhook(
161    schedule_at: types::obelisk::webhook::webhook_support::ScheduleAt,
162) -> HistoryEventScheduleAt {
163    use chrono::{DateTime, Utc};
164    use std::time::UNIX_EPOCH;
165    use types::obelisk::webhook::webhook_support::ScheduleAt;
166
167    match schedule_at {
168        ScheduleAt::Now => HistoryEventScheduleAt::Now,
169        ScheduleAt::At(datetime) => {
170            let duration = Duration::new(datetime.seconds, datetime.nanoseconds);
171            let systemtime = UNIX_EPOCH + duration;
172            HistoryEventScheduleAt::At(DateTime::<Utc>::from(systemtime))
173        }
174        ScheduleAt::In(duration) => {
175            use types::obelisk::types::time::Duration as WitDuration;
176            let std_duration = match duration {
177                WitDuration::Milliseconds(millis) => Duration::from_millis(millis),
178                WitDuration::Seconds(secs) => Duration::from_secs(secs),
179                WitDuration::Minutes(mins) => Duration::from_secs(u64::from(mins * 60)),
180                WitDuration::Hours(hours) => Duration::from_secs(u64::from(hours * 60 * 60)),
181                WitDuration::Days(days) => Duration::from_secs(u64::from(days * 24 * 60 * 60)),
182            };
183            HistoryEventScheduleAt::In(std_duration)
184        }
185    }
186}
187
188#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
189pub struct HttpTriggerConfig {
190    pub component_id: ComponentId,
191}
192type StdError = Box<dyn std::error::Error + Send + Sync>;
193
194#[derive(Debug, thiserror::Error)]
195pub enum WebhookServerError {
196    #[error("socket error: {0}")]
197    SocketError(std::io::Error),
198}
199
200pub struct WebhookEndpointCompiled {
201    pub config: WebhookEndpointConfig,
202    pub runnable_component: RunnableComponent,
203}
204
205impl WebhookEndpointCompiled {
206    pub fn new(
207        config: WebhookEndpointConfig,
208        runnable_component: RunnableComponent,
209    ) -> Result<Self, WasmFileError> {
210        Ok(Self {
211            config,
212            runnable_component,
213        })
214    }
215
216    #[must_use]
217    pub fn imports(&self) -> &[FunctionMetadata] {
218        &self.runnable_component.wasm_component.exim.imports_flat
219    }
220
221    #[instrument(skip_all, fields(component_id = %self.config.component_id), err)]
222    pub fn link(
223        self,
224        engine: &Engine,
225        fn_registry: &dyn FunctionRegistry,
226    ) -> Result<WebhookEndpointInstanceLinked, WasmFileError> {
227        let mut linker = Linker::new(engine);
228        // Link wasi
229        wasmtime_wasi::p2::add_to_linker_async(&mut linker)
230            .map_err(|err| WasmFileError::linking_error("cannot link `wasmtime_wasi`", err))?;
231        // Link wasi-http
232        wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker)
233            .map_err(|err| WasmFileError::linking_error("cannot link `wasmtime_wasi_http`", err))?;
234        // Link log and types
235        WebhookEndpointCtx::add_to_linker(&mut linker)?;
236
237        // Mock imported functions
238        for import in fn_registry
239            .all_exports()
240            .iter()
241            .filter(|import| {
242                // Skip already linked functions to avoid unexpected behavior and security issues.
243                !import.ifc_fqn.is_namespace_obelisk() && !import.ifc_fqn.is_namespace_wasi()
244            })
245            .filter(|import| {
246                // Keep only no-ext and -schedule interfaces
247                !import.ifc_fqn.is_extension()
248                    || import
249                        .ifc_fqn
250                        .package_strip_obelisk_schedule_suffix()
251                        .is_some()
252            })
253        {
254            trace!(
255                ifc_fqn = %import.ifc_fqn,
256                "Adding imported interface to the linker",
257            );
258            match linker.instance(import.ifc_fqn.deref()) {
259                Ok(mut linker_instance) => {
260                    for function_name in import.fns.keys() {
261                        let ffqn = FunctionFqn {
262                            ifc_fqn: import.ifc_fqn.clone(),
263                            function_name: function_name.clone(),
264                        };
265                        trace!("Adding mock for imported function {ffqn} to the linker");
266                        let res = linker_instance.func_new_async(function_name.deref(), {
267                            let ffqn = ffqn.clone();
268                            move |mut store_ctx: wasmtime::StoreContextMut<
269                                '_,
270                                WebhookEndpointCtx,
271                            >,
272                                  _component_func: ComponentFunc,
273                                  params: &[Val],
274                                  results: &mut [Val]| {
275                                let ffqn = ffqn.clone();
276                                let wasm_backtrace = if self.config.backtrace_persist {
277                                    let wasm_backtrace =
278                                        wasmtime::WasmBacktrace::capture(&store_ctx);
279                                    concepts::storage::WasmBacktrace::maybe_from(&wasm_backtrace)
280                                } else {
281                                    None
282                                };
283
284                                Box::new(async move {
285                                    Ok(store_ctx
286                                        .data_mut()
287                                        .call_imported_fn(ffqn, params, results, wasm_backtrace)
288                                        .await?)
289                                })
290                            }
291                        });
292                        if let Err(err) = res {
293                            return Err(WasmFileError::linking_error(
294                                format!("cannot add mock for imported function {ffqn}"),
295                                err,
296                            ));
297                        }
298                    }
299                }
300                Err(err) => {
301                    warn!(
302                        "Skipping interface {ifc_fqn} - {err:?}",
303                        ifc_fqn = import.ifc_fqn
304                    );
305                }
306            }
307        }
308
309        // Resolve JS imports against the function registry before linking.
310        let mut config = self.config;
311        if let Some(js_config) = &mut config.js_config {
312            js_config.resolved_imports =
313                crate::js_imports::resolve_js_imports(&js_config.source, fn_registry)
314                    .map_err(|e| crate::WasmFileError::linking_error("JS import resolution", e))?;
315        }
316
317        // Pre-instantiate to catch missing imports
318        let proxy_pre = linker
319            .instantiate_pre(&self.runnable_component.wasmtime_component)
320            .map_err(|err: wasmtime::Error| {
321                WasmFileError::linking_error("linking error while creating instantiate_pre", err)
322            })?;
323        let proxy_pre = Arc::new(ProxyPre::new(proxy_pre).map_err(|err: wasmtime::Error| {
324            WasmFileError::linking_error("linking error while creating ProxyPre instance", err)
325        })?);
326
327        Ok(WebhookEndpointInstanceLinked {
328            config: Arc::new(config),
329            proxy_pre,
330        })
331    }
332}
333
334#[derive(Clone, derive_more::Debug)]
335pub struct WebhookEndpointInstanceLinked {
336    #[debug(skip)]
337    proxy_pre: Arc<ProxyPre<WebhookEndpointCtx>>,
338    config: Arc<WebhookEndpointConfig>,
339}
340impl WebhookEndpointInstanceLinked {
341    #[must_use]
342    pub fn build(
343        &self,
344        log_forwarder_sender: &mpsc::Sender<LogInfoAppendRow>,
345    ) -> WebhookEndpointInstance {
346        let stdout = StdOutputConfigWithSender::new(
347            self.config.forward_stdout,
348            log_forwarder_sender,
349            LogStreamType::StdOut,
350        );
351        let stderr = StdOutputConfigWithSender::new(
352            self.config.forward_stderr,
353            log_forwarder_sender,
354            LogStreamType::StdErr,
355        );
356        WebhookEndpointInstance {
357            proxy_pre: self.proxy_pre.clone(),
358            stdout,
359            stderr,
360            logs_storage_config: self.config.logs_store_min_level.map(|min_level| {
361                LogStrageConfig {
362                    min_level,
363                    log_sender: log_forwarder_sender.clone(),
364                }
365            }),
366            config: self.config.clone(),
367        }
368    }
369}
370
371#[derive(Clone, derive_more::Debug)]
372pub struct WebhookEndpointInstance {
373    #[debug(skip)]
374    proxy_pre: Arc<ProxyPre<WebhookEndpointCtx>>,
375    config: Arc<WebhookEndpointConfig>,
376    #[debug(skip)]
377    stdout: Option<StdOutputConfigWithSender>,
378    #[debug(skip)]
379    stderr: Option<StdOutputConfigWithSender>,
380    logs_storage_config: Option<LogStrageConfig>,
381}
382
383pub struct MethodAwareRouter<T> {
384    method_map: hashbrown::HashMap<Method, Router<T>>,
385    fallback: Router<T>, // Routes that do not specify a method. Will be queried only if no match is found in `method_map`.
386}
387
388// Clone only because of potentially registering 2 paths via `route-recognizer`
389impl<T: Clone> MethodAwareRouter<T> {
390    pub fn add(&mut self, method: Option<Method>, route: &str, dest: T) {
391        let route = if route.is_empty() { "/*" } else { route };
392
393        let mut add = |method, route, dest| {
394            if let Some(method) = method {
395                self.method_map.entry(method).or_default().add(route, dest);
396            } else {
397                self.fallback.add(route, dest);
398            }
399        };
400
401        let prefix_with_slash;
402        if let Some(prefix) = route.strip_suffix("/*") {
403            // Add {prefix}/ because the library would not match it otherwise.
404            prefix_with_slash = format!("{prefix}/");
405            add(method.clone(), &prefix_with_slash, dest.clone());
406        }
407        add(method, route, dest);
408    }
409}
410
411impl<T> MethodAwareRouter<T> {
412    fn find(&self, method: &Method, path: &Uri) -> Option<Match<&T>> {
413        let path = path.path();
414        self.method_map
415            .get(method)
416            .and_then(|router| router.recognize(path).ok())
417            .or_else(|| self.fallback.recognize(path).ok())
418    }
419}
420
421impl<T> Default for MethodAwareRouter<T> {
422    fn default() -> Self {
423        Self {
424            method_map: hashbrown::HashMap::default(),
425            fallback: Router::default(),
426        }
427    }
428}
429
430/// Swappable per-server state for hot-redeploy.
431pub struct WebhookServerState {
432    pub deployment_id: DeploymentId,
433    pub router: Arc<MethodAwareRouter<WebhookEndpointInstanceLinked>>,
434    pub fn_registry: Arc<dyn FunctionRegistry>,
435}
436
437#[expect(clippy::too_many_arguments)]
438pub async fn server(
439    http_server: String,
440    listener: TcpListener,
441    engine: Arc<Engine>,
442    wh_server_state_watcher_parent: WebhookStateWatcher,
443    log_forwarder_sender: mpsc::Sender<LogInfoAppendRow>,
444    db_pool: Arc<dyn DbPool>,
445    clock_fn: Box<dyn ClockFn>,
446    sleep: Arc<dyn Sleep>,
447    max_inflight_requests: Option<Arc<tokio::sync::Semaphore>>,
448    server_termination_watcher: watch::Receiver<()>,
449) -> Result<(), WebhookServerError> {
450    loop {
451        let (stream, _) = listener
452            .accept()
453            .await
454            .map_err(WebhookServerError::SocketError)?;
455        let stream_id = format!("{stream:?}");
456        let stream = TokioIo::new(stream);
457
458        // Each connection must not affect other connections.
459        let mut wh_server_state_watcher = wh_server_state_watcher_parent.clone();
460        // Snapshot state before spawning, marking the latest value as seen.
461        let state = wh_server_state_watcher.borrow_and_update().clone();
462        let deployment_id = state.deployment_id;
463        debug!(%deployment_id, %stream_id, "Initializing connection");
464        // Spawn a tokio task for each TCP connection.
465        tokio::task::spawn(
466            {
467                let engine = engine.clone();
468                let clock_fn = clock_fn.clone_box();
469                let sleep = sleep.clone();
470                let db_pool = db_pool.clone();
471                let http_server = http_server.clone();
472                let connection_span = info_span!("connection", %http_server);
473                let max_inflight_requests = max_inflight_requests.clone();
474                let server_termination_watcher = server_termination_watcher.clone();
475                let log_forwarder_sender = log_forwarder_sender.clone();
476                async move {
477                    let (connection_drop_sender, connection_drop_watcher) = watch::channel(());
478                    let deployment_id = state.deployment_id;
479                    let mut conn = http1::Builder::new()
480                        .serve_connection(
481                            stream,
482                            hyper::service::service_fn({
483                                move |req| {
484                                    let execution_id = ExecutionId::generate().get_top_level();
485                                    trace!(%execution_id, %deployment_id, method = %req.method(), uri = %req.uri(), "Processing request");
486                                    RequestHandler {
487                                        deployment_id: state.deployment_id,
488                                        engine: engine.clone(),
489                                        clock_fn: clock_fn.clone_box(),
490                                        sleep: sleep.clone(),
491                                        db_pool: db_pool.clone(),
492                                        fn_registry: state.fn_registry.clone(),
493                                        execution_id,
494                                        router: state.router.clone(),
495                                        connection_drop_watcher: connection_drop_watcher.clone(),
496                                        server_termination_watcher: server_termination_watcher.clone(),
497                                        log_forwarder_sender: log_forwarder_sender.clone()
498                                    }
499                                    .handle_request(req, max_inflight_requests.clone())
500                                }.instrument(info_span!(parent: &connection_span, "request", %deployment_id))
501                            })
502                        );
503                    let mut conn = Pin::new(&mut conn);
504                    let res = loop {
505                        select! {
506                            result = conn.as_mut() => {
507                                break result;
508                            }
509                            changed = wh_server_state_watcher.changed() => {
510                                conn.as_mut().graceful_shutdown();
511                                if changed.is_ok() {
512                                    let new_deployment_id = wh_server_state_watcher.borrow().deployment_id;
513                                    debug!(%http_server, "Switching to {new_deployment_id}, gracefully shutting down connection");
514                                } else {
515                                    debug!(%http_server, "Deployment watcher dropped, gracefully shutting down connection");
516                                    break conn.as_mut().await;
517                                }
518                            }
519                        }
520                    };
521                    if let Err(err) = res {
522                        info!(%http_server, "Error serving connection: {err:?}");
523                        drop(connection_drop_sender);
524                    }
525                }
526            }.instrument(debug_span!("tcp stream", %deployment_id, %stream_id))
527        );
528    }
529}
530
531#[derive(Debug, Clone)]
532pub struct WebhookEndpointConfig {
533    pub component_id: ComponentId,
534    pub forward_stdout: Option<StdOutputConfig>,
535    pub forward_stderr: Option<StdOutputConfig>,
536    pub env_vars: Arc<[EnvVar]>,
537    pub fuel: Option<u64>,
538    pub backtrace_persist: bool,
539    pub subscription_interruption: Option<Duration>,
540    pub logs_store_min_level: Option<LogLevel>,
541    pub allowed_hosts: Arc<[crate::http_request_policy::AllowedHostConfig]>,
542    pub js_config: Option<WebhookEndpointJsConfig>,
543    /// The TOML config section type for error messages
544    pub config_section_hint: crate::http_hooks::ConfigSectionHint,
545}
546
547#[derive(Debug, Clone)]
548pub struct WebhookEndpointJsConfig {
549    pub source: String,
550    pub file_name: String,
551    /// Resolved imports: specifier → [(`js_name`, `wit_name`)].
552    /// Serialized as JSON and passed to the runtime via `__OBELISK_RESOLVED_IMPORTS__`.
553    pub resolved_imports: HashMap<String, Vec<(String, String)>>,
554}
555
556struct WebhookEndpointCtx {
557    component_id: ComponentId,
558    deployment_id: DeploymentId,
559    clock_fn: Box<dyn ClockFn>,
560    sleep: Arc<dyn Sleep>,
561    db_pool: Arc<dyn DbPool>,
562    fn_registry: Arc<dyn FunctionRegistry>,
563    table: ResourceTable,
564    wasi_ctx: WasiCtx,
565    http_ctx: WasiHttpCtx,
566    execution_id: ExecutionIdTopLevel,
567    next_join_set_idx: u64,
568    version: Option<Version>,
569    component_logger: ComponentLogger,
570    subscription_interruption: Option<Duration>,
571    connection_drop_watcher: watch::Receiver<()>,
572    server_termination_watcher: watch::Receiver<()>,
573    http_hooks: HttpHooks,
574}
575
576impl HostJoinSet for WebhookEndpointCtx {
577    fn id(&mut self, _resource: wasmtime::component::Resource<JoinSetId>) -> String {
578        unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
579    }
580
581    fn submit_delay(
582        &mut self,
583        _self_: wasmtime::component::Resource<JoinSetId>,
584        _timeout: types::obelisk::types::time::ScheduleAt,
585    ) -> types::obelisk::types::execution::DelayId {
586        unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
587    }
588
589    fn join_next(
590        &mut self,
591        _self_: wasmtime::component::Resource<JoinSetId>,
592    ) -> Result<(types::obelisk::types::execution::ResponseId, Result<(), ()>), JoinNextError> {
593        unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
594    }
595
596    fn drop(
597        &mut self,
598        _resource: wasmtime::component::Resource<JoinSetId>,
599    ) -> wasmtime::Result<()> {
600        unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
601    }
602}
603
604impl ExecutionHost for WebhookEndpointCtx {
605    fn convert_schedule_json_error(
606        &mut self,
607        err: ScheduleJsonErrorTrappable,
608    ) -> wasmtime::Result<types::obelisk::types::execution::ScheduleJsonError> {
609        match err {
610            ScheduleJsonErrorTrappable::Normal(err) => Ok(err),
611            ScheduleJsonErrorTrappable::Trap(err) => Err(err),
612        }
613    }
614}
615
616fn wit_backtrace_to_storage(
617    bt: types::obelisk::types::backtrace::WasmBacktrace,
618) -> concepts::storage::WasmBacktrace {
619    concepts::storage::WasmBacktrace {
620        frames: bt
621            .frames
622            .into_iter()
623            .map(|f| concepts::storage::FrameInfo {
624                module: f.module,
625                func_name: f.func_name,
626                symbols: f
627                    .symbols
628                    .into_iter()
629                    .map(|s| concepts::storage::FrameSymbol {
630                        func_name: s.func_name,
631                        file: s.file,
632                        line: s.line,
633                        col: s.col,
634                    })
635                    .collect(),
636            })
637            .collect(),
638    }
639}
640
641impl WebhookSupportHost for WebhookEndpointCtx {
642    async fn execution_id_generate(
643        &mut self,
644    ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::ExecutionId> {
645        let execution_id = ExecutionId::generate();
646        Ok(types::obelisk::webhook::webhook_support::ExecutionId {
647            id: execution_id.to_string(),
648        })
649    }
650
651    /// Deprecated, use `execution_id_current`.
652    async fn current_execution_id(
653        &mut self,
654    ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::ExecutionId> {
655        self.execution_id_current().await
656    }
657
658    async fn execution_id_current(
659        &mut self,
660    ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::ExecutionId> {
661        Ok(types::obelisk::webhook::webhook_support::ExecutionId {
662            id: self.execution_id.to_string(),
663        })
664    }
665
666    fn convert_get_status_error(
667        &mut self,
668        err: GetStatusErrorTrappable,
669    ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::GetStatusError> {
670        match err {
671            GetStatusErrorTrappable::Normal(err) => Ok(err),
672            GetStatusErrorTrappable::Trap(err) => Err(err),
673        }
674    }
675
676    fn convert_get_error(
677        &mut self,
678        err: GetErrorTrappable,
679    ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::GetError> {
680        match err {
681            GetErrorTrappable::Normal(err) => Ok(err),
682            GetErrorTrappable::Trap(err) => Err(err),
683        }
684    }
685
686    fn convert_try_get_error(
687        &mut self,
688        err: TryGetErrorTrappable,
689    ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::TryGetError> {
690        match err {
691            TryGetErrorTrappable::Normal(err) => Ok(err),
692            TryGetErrorTrappable::Trap(err) => Err(err),
693        }
694    }
695
696    async fn schedule_json(
697        &mut self,
698        execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
699        schedule_at: types::obelisk::webhook::webhook_support::ScheduleAt,
700        function: types::obelisk::webhook::webhook_support::Function,
701        params: String,
702        _config: Option<types::obelisk::webhook::webhook_support::SubmitConfig>,
703        backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
704    ) -> Result<(), ScheduleJsonErrorTrappable> {
705        use types::obelisk::types::execution::ScheduleJsonError;
706
707        // Parse the execution ID
708        let execution_id = match concepts::ExecutionId::from_str(&execution_id.id) {
709            Ok(id) => id,
710            Err(err) => {
711                let msg = format!("schedule-json: invalid execution ID: {err}");
712                self.error(msg.clone());
713                return Err(ScheduleJsonError::FfqnParsingError(msg).into());
714            }
715        };
716
717        // Parse the function FFQN
718        let ffqn =
719            match FunctionFqn::try_from_tuple(&function.interface_name, &function.function_name) {
720                Ok(ffqn) => ffqn,
721                Err(err) => {
722                    let msg = format!("schedule-json: invalid function name: {err}");
723                    self.error(msg.clone());
724                    return Err(ScheduleJsonError::FfqnParsingError(msg).into());
725                }
726            };
727
728        // Look up function in registry
729        let Some((fn_metadata, component_id)) = self.fn_registry.get_by_exported_function(&ffqn)
730        else {
731            self.error("schedule-json: function not found".to_string());
732            return Err(ScheduleJsonError::FunctionNotFound.into());
733        };
734
735        // Parse params JSON array
736        let params_json: Vec<serde_json::Value> = match serde_json::from_str(&params) {
737            Ok(serde_json::Value::Array(arr)) => arr,
738            Ok(_) => {
739                let msg = "schedule-json: params must be a JSON array".to_string();
740                self.error(msg.clone());
741                return Err(ScheduleJsonError::TypeCheckError(msg).into());
742            }
743            Err(err) => {
744                let msg = format!("schedule-json: cannot parse params as JSON: {err}");
745                self.error(msg.clone());
746                return Err(ScheduleJsonError::TypeCheckError(msg).into());
747            }
748        };
749
750        // Type check and convert params
751        let params = match Params::from_json_values(
752            Arc::from(params_json),
753            fn_metadata
754                .parameter_types
755                .iter()
756                .map(|param_type| &param_type.type_wrapper),
757        ) {
758            Ok(params) => params,
759            Err(err) => {
760                let msg = format!("schedule-json: params type checking failed: {err}");
761                self.error(msg.clone());
762                return Err(ScheduleJsonError::TypeCheckError(msg).into());
763            }
764        };
765
766        // Convert schedule_at
767        let history_event_schedule_at = schedule_at_from_webhook(schedule_at);
768        let created_at = self.clock_fn.now();
769        let schedule_at = match history_event_schedule_at.as_date_time(created_at) {
770            Ok(dt) => dt,
771            Err(err) => {
772                let msg = format!("schedule-json: invalid schedule-at: {err}");
773                self.error(msg.clone());
774                return Err(ScheduleJsonError::TypeCheckError(msg).into());
775            }
776        };
777
778        // Create execution in database
779        let version = match self.get_version_or_create().await {
780            Ok(v) => v,
781            Err(err) => {
782                return Err(wasmtime::Error::msg(format!("database error: {err:?}")).into());
783            }
784        };
785
786        let event = HistoryEvent::Schedule {
787            execution_id: execution_id.clone(),
788            schedule_at: history_event_schedule_at,
789            result: Ok(()),
790        };
791        let append_req = AppendRequest {
792            event: ExecutionRequest::HistoryEvent { event },
793            created_at,
794        };
795        let create_req = CreateRequest {
796            created_at,
797            execution_id: execution_id.clone(),
798            ffqn,
799            params,
800            parent: None,
801            metadata: ExecutionMetadata::from_linked_span(&self.component_logger.span),
802            scheduled_at: schedule_at,
803            component_id: component_id.clone(),
804            deployment_id: self.deployment_id,
805            scheduled_by: Some(ExecutionId::TopLevel(self.execution_id)),
806            paused: false,
807        };
808
809        let db_connection = match self.db_pool.connection().await {
810            Ok(conn) => conn,
811            Err(err) => {
812                return Err(
813                    wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
814                );
815            }
816        };
817
818        let backtrace_infos: Vec<BacktraceInfo> = backtrace
819            .map(|bt| BacktraceInfo {
820                execution_id: ExecutionId::TopLevel(self.execution_id),
821                component_id: self.component_id.clone(),
822                version_min_including: version.clone(),
823                version_max_excluding: Version(version.0 + 1),
824                wasm_backtrace: wit_backtrace_to_storage(bt),
825            })
826            .into_iter()
827            .collect();
828
829        match db_connection
830            .append_batch_create_new_execution(
831                created_at,
832                vec![append_req],
833                ExecutionId::TopLevel(self.execution_id),
834                version.clone(),
835                vec![create_req],
836                backtrace_infos,
837            )
838            .await
839        {
840            Ok(new_version) => {
841                self.version = Some(new_version);
842                Ok(())
843            }
844            Err(err) => Err(wasmtime::Error::msg(format!("database write error: {err:?}")).into()),
845        }
846    }
847
848    async fn call_json(
849        &mut self,
850        function: types::obelisk::webhook::webhook_support::Function,
851        params: String,
852        _config: Option<types::obelisk::webhook::webhook_support::SubmitConfig>,
853        backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
854    ) -> Result<Result<Option<String>, Option<String>>, ScheduleJsonErrorTrappable> {
855        use types::obelisk::types::execution::ScheduleJsonError;
856
857        // Parse the function FFQN
858        let ffqn =
859            match FunctionFqn::try_from_tuple(&function.interface_name, &function.function_name) {
860                Ok(ffqn) => ffqn,
861                Err(err) => {
862                    let msg = format!("call-json: invalid function name: {err}");
863                    self.error(msg.clone());
864                    return Err(ScheduleJsonError::FfqnParsingError(msg).into());
865                }
866            };
867
868        // Look up function in registry
869        let Some((fn_metadata, component_id)) = self.fn_registry.get_by_exported_function(&ffqn)
870        else {
871            self.error("call-json: function not found".to_string());
872            return Err(ScheduleJsonError::FunctionNotFound.into());
873        };
874
875        // Parse params JSON array
876        let params_json: Vec<serde_json::Value> = match serde_json::from_str(&params) {
877            Ok(serde_json::Value::Array(arr)) => arr,
878            Ok(_) => {
879                let msg = "call-json: params must be a JSON array".to_string();
880                self.error(msg.clone());
881                return Err(ScheduleJsonError::TypeCheckError(msg).into());
882            }
883            Err(err) => {
884                let msg = format!("call-json: cannot parse params as JSON: {err}");
885                self.error(msg.clone());
886                return Err(ScheduleJsonError::TypeCheckError(msg).into());
887            }
888        };
889
890        // Type check and convert params
891        let params = match Params::from_json_values(
892            Arc::from(params_json),
893            fn_metadata
894                .parameter_types
895                .iter()
896                .map(|param_type| &param_type.type_wrapper),
897        ) {
898            Ok(params) => params,
899            Err(err) => {
900                let msg = format!("call-json: params type checking failed: {err}");
901                self.error(msg.clone());
902                return Err(ScheduleJsonError::TypeCheckError(msg).into());
903            }
904        };
905
906        // Get or create version
907        let version = match self.get_version_or_create().await {
908            Ok(v) => v,
909            Err(err) => {
910                return Err(wasmtime::Error::msg(format!("database error: {err:?}")).into());
911            }
912        };
913
914        // Create a OneOff join set and child execution ID
915        let (join_set_id, child_execution_id) = self.create_oneoff_join_set();
916
917        let created_at = self.clock_fn.now();
918
919        // 1. Create join set
920        let req_join_set_created = AppendRequest {
921            created_at,
922            event: ExecutionRequest::HistoryEvent {
923                event: HistoryEvent::JoinSetCreate {
924                    join_set_id: join_set_id.clone(),
925                },
926            },
927        };
928
929        // 2. Create child execution request
930        let req_child_exec = AppendRequest {
931            created_at,
932            event: ExecutionRequest::HistoryEvent {
933                event: HistoryEvent::JoinSetRequest {
934                    join_set_id: join_set_id.clone(),
935                    request: JoinSetRequest::ChildExecutionRequest {
936                        child_execution_id: child_execution_id.clone(),
937                        target_ffqn: ffqn.clone(),
938                        params: params.clone(),
939                        result: Ok(()),
940                    },
941                },
942            },
943        };
944
945        // 3. Add JoinNext to wait for result
946        let req_join_next = AppendRequest {
947            created_at,
948            event: ExecutionRequest::HistoryEvent {
949                event: HistoryEvent::JoinNext {
950                    join_set_id: join_set_id.clone(),
951                    run_expires_at: created_at,
952                    closing: false,
953                    requested_ffqn: Some(ffqn.clone()),
954                },
955            },
956        };
957
958        // Create the child execution
959        let req_create_child = CreateRequest {
960            created_at,
961            execution_id: ExecutionId::Derived(child_execution_id.clone()),
962            ffqn,
963            params,
964            parent: Some((ExecutionId::TopLevel(self.execution_id), join_set_id)),
965            metadata: ExecutionMetadata::from_parent_span(&self.component_logger.span),
966            scheduled_at: created_at,
967            component_id: component_id.clone(),
968            deployment_id: self.deployment_id,
969            scheduled_by: None,
970            paused: false,
971        };
972
973        let db_connection = match self.db_pool.connection().await {
974            Ok(conn) => conn,
975            Err(err) => {
976                return Err(
977                    wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
978                );
979            }
980        };
981
982        let appended = vec![req_join_set_created, req_child_exec, req_join_next];
983
984        let backtrace_infos: Vec<BacktraceInfo> = backtrace
985            .map(|bt| BacktraceInfo {
986                execution_id: ExecutionId::TopLevel(self.execution_id),
987                component_id: self.component_id.clone(),
988                version_min_including: version.clone(),
989                version_max_excluding: Version(version.0 + 3),
990                wasm_backtrace: wit_backtrace_to_storage(bt),
991            })
992            .into_iter()
993            .collect();
994
995        match db_connection
996            .append_batch_create_new_execution(
997                created_at,
998                appended,
999                ExecutionId::TopLevel(self.execution_id),
1000                version,
1001                vec![req_create_child],
1002                backtrace_infos,
1003            )
1004            .await
1005        {
1006            Ok(new_version) => {
1007                self.version = Some(new_version);
1008            }
1009            Err(err) => {
1010                return Err(wasmtime::Error::msg(format!("database write error: {err:?}")).into());
1011            }
1012        }
1013
1014        // Wait for the result
1015        let result = Self::wait_for_finished_result(
1016            self.subscription_interruption,
1017            &self.sleep,
1018            db_connection.as_ref(),
1019            &ExecutionId::Derived(child_execution_id),
1020            &self.connection_drop_watcher,
1021            &self.server_termination_watcher,
1022        )
1023        .await;
1024
1025        match result {
1026            Ok(retval) => Ok(supported_return_value_to_json_result(&retval)),
1027            Err(err) => Err(wasmtime::Error::msg(format!("execution error: {err:?}")).into()),
1028        }
1029    }
1030
1031    async fn get_status(
1032        &mut self,
1033        execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
1034        _backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
1035    ) -> Result<types::obelisk::webhook::webhook_support::ExecutionStatus, GetStatusErrorTrappable>
1036    {
1037        use types::obelisk::webhook::webhook_support::{
1038            ExecutionStatus, ExecutionStatusFinished, GetStatusError,
1039        };
1040
1041        // Parse the execution ID
1042        let execution_id = match concepts::ExecutionId::from_str(&execution_id.id) {
1043            Ok(id) => id,
1044            Err(err) => {
1045                let msg = format!("get-status: cannot parse execution ID: {err}");
1046                self.error(msg.clone());
1047                return Err(GetStatusError::ExecutionIdParsingError(msg).into());
1048            }
1049        };
1050
1051        // Get execution status from database
1052        let db_connection = match self.db_pool.connection().await {
1053            Ok(conn) => conn,
1054            Err(err) => {
1055                return Err(
1056                    wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
1057                );
1058            }
1059        };
1060
1061        let execution_with_state = match db_connection.get_pending_state(&execution_id).await {
1062            Ok(state) => state,
1063            Err(DbErrorRead::NotFound) => {
1064                return Err(GetStatusError::NotFound.into());
1065            }
1066            Err(err) => {
1067                return Err(wasmtime::Error::msg(format!("database read error: {err:?}")).into());
1068            }
1069        };
1070
1071        // Convert PendingState to ExecutionStatus
1072        let status = match execution_with_state.pending_state {
1073            PendingState::PendingAt(state) => {
1074                ExecutionStatus::PendingAt(types::obelisk::types::time::Datetime {
1075                    seconds: u64::try_from(state.scheduled_at.timestamp())
1076                        .expect("pending at before unix epoch is unsupported"),
1077                    nanoseconds: state.scheduled_at.timestamp_subsec_nanos(),
1078                })
1079            }
1080            PendingState::Locked(_) => ExecutionStatus::Locked,
1081            PendingState::BlockedByJoinSet(_) => ExecutionStatus::BlockedByJoinSet,
1082            PendingState::Paused(_) => ExecutionStatus::Paused,
1083            PendingState::Finished(finished) => {
1084                let finished_status = match finished.result_kind {
1085                    PendingStateFinishedResultKind::Ok => ExecutionStatusFinished::Ok,
1086                    PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error) => {
1087                        ExecutionStatusFinished::Err
1088                    }
1089                    PendingStateFinishedResultKind::Err(
1090                        PendingStateFinishedError::ExecutionFailure(_),
1091                    ) => ExecutionStatusFinished::ExecutionFailure,
1092                };
1093                ExecutionStatus::Finished(finished_status)
1094            }
1095        };
1096
1097        Ok(status)
1098    }
1099
1100    async fn get(
1101        &mut self,
1102        execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
1103        _backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
1104    ) -> Result<Result<Option<String>, Option<String>>, GetErrorTrappable> {
1105        use types::obelisk::webhook::webhook_support::GetError;
1106
1107        // Parse the execution ID
1108        let parsed_execution_id: concepts::ExecutionId =
1109            match concepts::ExecutionId::from_str(&execution_id.id) {
1110                Ok(id) => id,
1111                Err(err) => {
1112                    let msg = format!("get: cannot parse execution ID: {err}");
1113                    self.error(msg.clone());
1114                    return Err(GetError::ExecutionIdParsingError(msg).into());
1115                }
1116            };
1117
1118        // Get database connection
1119        let db_connection = match self.db_pool.connection().await {
1120            Ok(conn) => conn,
1121            Err(err) => {
1122                return Err(
1123                    wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
1124                );
1125            }
1126        };
1127
1128        // Extract needed values before the async call to avoid borrowing issues
1129        let subscription_interruption = self.subscription_interruption;
1130        let sleep = self.sleep.clone();
1131        let connection_drop_watcher = self.connection_drop_watcher.clone();
1132        let server_termination_watcher = self.server_termination_watcher.clone();
1133
1134        // Wait for the execution to finish
1135        let result = Self::wait_for_finished_result(
1136            subscription_interruption,
1137            &sleep,
1138            db_connection.as_ref(),
1139            &parsed_execution_id,
1140            &connection_drop_watcher,
1141            &server_termination_watcher,
1142        )
1143        .await;
1144
1145        match result {
1146            Ok(retval) => Ok(supported_return_value_to_json_result(&retval)),
1147            Err(err) => Err(wasmtime::Error::msg(format!("execution error: {err:?}")).into()),
1148        }
1149    }
1150
1151    async fn try_get(
1152        &mut self,
1153        execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
1154        _backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
1155    ) -> Result<Result<Option<String>, Option<String>>, TryGetErrorTrappable> {
1156        use types::obelisk::webhook::webhook_support::TryGetError;
1157
1158        // Parse the execution ID
1159        let parsed_execution_id: concepts::ExecutionId =
1160            match concepts::ExecutionId::from_str(&execution_id.id) {
1161                Ok(id) => id,
1162                Err(err) => {
1163                    let msg = format!("try-get: cannot parse execution ID: {err}");
1164                    self.error(msg.clone());
1165                    return Err(TryGetError::ExecutionIdParsingError(msg).into());
1166                }
1167            };
1168
1169        // Get database connection
1170        let db_connection = match self.db_pool.connection().await {
1171            Ok(conn) => conn,
1172            Err(err) => {
1173                return Err(
1174                    wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
1175                );
1176            }
1177        };
1178
1179        // Get execution state
1180        let execution_with_state = match db_connection.get_pending_state(&parsed_execution_id).await
1181        {
1182            Ok(state) => state,
1183            Err(DbErrorRead::NotFound) => {
1184                self.error(format!("try-get: execution not found: {}", execution_id.id));
1185                return Err(TryGetError::NotFound.into());
1186            }
1187            Err(err) => {
1188                return Err(wasmtime::Error::msg(format!("database read error: {err:?}")).into());
1189            }
1190        };
1191
1192        // Check if finished
1193        match execution_with_state.pending_state {
1194            PendingState::Finished(_) => {
1195                // Get the actual result
1196                match db_connection
1197                    .wait_for_finished_result(&parsed_execution_id, None)
1198                    .await
1199                {
1200                    Ok(retval) => Ok(supported_return_value_to_json_result(&retval)),
1201                    Err(err) => {
1202                        Err(wasmtime::Error::msg(format!("database read error: {err:?}")).into())
1203                    }
1204                }
1205            }
1206            _ => Err(TryGetError::NotFinishedYet.into()),
1207        }
1208    }
1209}
1210
1211#[derive(thiserror::Error, Debug, Clone)]
1212
1213enum WebhookEndpointFunctionError {
1214    #[error(transparent)]
1215    DbError(#[from] DbErrorWrite),
1216    #[error(transparent)]
1217    FinishedExecutionFailure(#[from] FinishedExecutionFailure),
1218    #[error("uncategorized error: {0}")]
1219    UncategorizedError(&'static str),
1220    #[error("connection closed")]
1221    ConnectionClosed,
1222}
1223impl From<DbErrorGeneric> for WebhookEndpointFunctionError {
1224    fn from(value: DbErrorGeneric) -> Self {
1225        WebhookEndpointFunctionError::DbError(DbErrorWrite::Generic(value))
1226    }
1227}
1228
1229impl wasmtime::component::HasData for WebhookEndpointCtx {
1230    type Data<'a> = &'a mut WebhookEndpointCtx;
1231}
1232
1233impl WebhookEndpointCtx {
1234    // Create new execution if this is the first call of the request/response cycle
1235    async fn get_version_or_create(&mut self) -> Result<Version, DbErrorWrite> {
1236        if let Some(found) = &self.version {
1237            return Ok(found.clone());
1238        }
1239        let created_at = self.clock_fn.now();
1240        // Associate the top level execution with the request span. Allows to find the trace by execution id.
1241        let metadata = concepts::ExecutionMetadata::from_parent_span(&self.component_logger.span);
1242        let create_request = CreateRequest {
1243            created_at,
1244            execution_id: ExecutionId::TopLevel(self.execution_id),
1245            ffqn: HTTP_HANDLER_FFQN,
1246            params: Params::empty(),
1247            parent: None,
1248            metadata,
1249            scheduled_at: created_at,
1250            component_id: self.component_id.clone(),
1251            deployment_id: self.deployment_id,
1252            scheduled_by: None,
1253            paused: false,
1254        };
1255        let conn = self.db_pool.connection().await?;
1256        let version = conn.create(create_request).await?;
1257        self.version = Some(version.clone());
1258        Ok(version)
1259    }
1260
1261    /// Create a new `OneOff` join set and return its ID along with a child execution ID.
1262    fn create_oneoff_join_set(&mut self) -> (JoinSetId, ExecutionIdDerived) {
1263        let join_set_id = JoinSetId::new(
1264            JoinSetKind::OneOff,
1265            StrVariant::from(self.next_join_set_idx.to_string()),
1266        )
1267        .expect("numeric names must be allowed");
1268        self.next_join_set_idx += 1;
1269        let child_execution_id = ExecutionId::TopLevel(self.execution_id).next_level(&join_set_id);
1270        (join_set_id, child_execution_id)
1271    }
1272
1273    #[instrument(skip_all, fields(%ffqn, version, %execution_id = self.execution_id))]
1274    async fn call_imported_fn(
1275        &mut self,
1276        ffqn: FunctionFqn,
1277        params: &[Val],
1278        results: &mut [Val],
1279        wasm_backtrace: Option<concepts::storage::WasmBacktrace>,
1280    ) -> Result<(), WebhookEndpointFunctionError> {
1281        trace!(?params, "call_imported_fn start");
1282        assert_eq!(
1283            1,
1284            results.len(),
1285            "direct call: no-ext export must return `result`, -schedule returns `execuiton-id`"
1286        );
1287
1288        if self.connection_drop_watcher.has_changed().is_err()
1289            || self.server_termination_watcher.has_changed().is_err()
1290        {
1291            debug!("Cancellation request detected");
1292            return Err(WebhookEndpointFunctionError::ConnectionClosed);
1293        }
1294
1295        if let Some(package_name) = ffqn.ifc_fqn.package_strip_obelisk_schedule_suffix() {
1296            // -schedule
1297            let ifc_fqn = IfcFqnName::from_parts(
1298                ffqn.ifc_fqn.namespace(),
1299                package_name,
1300                ffqn.ifc_fqn.ifc_name(),
1301                ffqn.ifc_fqn.version(),
1302            );
1303            if let Some(function_name) = ffqn.function_name.strip_suffix(SUFFIX_FN_SCHEDULE) {
1304                let ffqn =
1305                    FunctionFqn::new_arc(Arc::from(ifc_fqn.to_string()), Arc::from(function_name));
1306                debug!("Got `-schedule` extension for {ffqn}");
1307                let Some((schedule_at, params)) = params.split_first() else {
1308                    error!(
1309                        "Error running `-schedule` extension function: exepcted at least one parameter of type `schedule-at`, got empty parameter list"
1310                    );
1311                    return Err(WebhookEndpointFunctionError::UncategorizedError(
1312                        "error running `-schedule` extension function: exepcted at least one parameter of type `schedule-at`, got empty parameter list",
1313                    ));
1314                };
1315                let schedule_at =
1316                    WastVal::try_from(schedule_at.clone()).map_err(|err| {
1317                        error!("Error running `-schedule` extension function: cannot convert to internal representation - {err:?}");
1318                        WebhookEndpointFunctionError::UncategorizedError(
1319                            "error running `-schedule` extension function: cannot convert to internal representation",
1320                        )
1321                    })?;
1322                let schedule_at = match history_event_schedule_at_from_wast_val(&schedule_at) {
1323                    Ok(ok) => ok,
1324                    Err(err) => {
1325                        error!(
1326                            "Wrong type for the first `-schedule` extension function parameter, expected `schedule-at`, got `{schedule_at:?}` - {err:?}"
1327                        );
1328                        return Err(WebhookEndpointFunctionError::UncategorizedError(
1329                            "error running `-schedule` extension function: wrong first parameter type",
1330                        ));
1331                    }
1332                };
1333                // Write to db
1334                let version = self.get_version_or_create().await?;
1335                let span = Span::current();
1336                span.record("version", tracing::field::display(&version));
1337                let new_execution_id = ExecutionId::generate();
1338                let (_function_metadata, child_component_id) = self
1339                    .fn_registry
1340                    .get_by_exported_function(&ffqn)
1341                    .expect("target function must be found in fn_registry");
1342                let created_at = self.clock_fn.now();
1343
1344                let event = HistoryEvent::Schedule {
1345                    execution_id: new_execution_id.clone(),
1346                    schedule_at,
1347                    result: Ok(()),
1348                };
1349                let schedule_at = schedule_at.as_date_time(created_at).map_err(|_err| {
1350                    WebhookEndpointFunctionError::UncategorizedError("schedule-at conversion error")
1351                })?;
1352                let child_exec_req = AppendRequest {
1353                    event: ExecutionRequest::HistoryEvent { event },
1354                    created_at,
1355                };
1356
1357                let create_child_req = CreateRequest {
1358                    created_at,
1359                    execution_id: new_execution_id.clone(),
1360                    ffqn,
1361                    params: Params::from_wasmtime(Arc::from(params)),
1362                    parent: None, // Schedule breaks from the parent-child relationship to avoid a linked list
1363                    metadata: ExecutionMetadata::from_linked_span(&self.component_logger.span),
1364                    scheduled_at: schedule_at,
1365                    component_id: child_component_id.clone(),
1366                    deployment_id: self.deployment_id,
1367                    scheduled_by: Some(ExecutionId::TopLevel(self.execution_id)),
1368                    paused: false,
1369                };
1370                let db_connection = self.db_pool.connection().await?;
1371                let expected_next_version = version.increment();
1372                let backtrace_info = wasm_backtrace.map(|wasm_backtrace| BacktraceInfo {
1373                    execution_id: ExecutionId::TopLevel(self.execution_id),
1374                    component_id: self.component_id.clone(),
1375                    version_min_including: version.clone(),
1376                    version_max_excluding: expected_next_version.clone(),
1377                    wasm_backtrace,
1378                });
1379                let version = db_connection
1380                    .append_batch_create_new_execution(
1381                        created_at,
1382                        vec![child_exec_req],
1383                        ExecutionId::TopLevel(self.execution_id),
1384                        version.clone(),
1385                        vec![create_child_req],
1386                        backtrace_info.into_iter().collect(),
1387                    )
1388                    .await?;
1389                assert_eq!(version, expected_next_version); // Expected for backtrace's version_max_excluding
1390                self.version = Some(version.clone());
1391                results[0] = execution_id_into_val(&new_execution_id);
1392            } else {
1393                error!("unrecognized `{SUFFIX_PKG_SCHEDULE}` extension function {ffqn}");
1394                return Err(WebhookEndpointFunctionError::UncategorizedError(
1395                    "unrecognized extension function",
1396                ));
1397            }
1398        } else {
1399            // direct call
1400            let version = self.get_version_or_create().await?;
1401            let span = Span::current();
1402            span.record("version", tracing::field::display(&version));
1403            let (join_set_id_direct, child_execution_id) = self.create_oneoff_join_set();
1404            let created_at = self.clock_fn.now();
1405            let (fn_metadata, child_component_id) = self
1406                .fn_registry
1407                .get_by_exported_function(&ffqn)
1408                .expect("import was mocked using fn_registry exports limited to -schedule and no-ext functions");
1409            assert!(
1410                fn_metadata.extension.is_none(),
1411                "direct call: function must be no-ext"
1412            );
1413            let return_type_tl = assert_matches!(fn_metadata.return_type, ReturnType::Extendable(compatible) => compatible.type_wrapper_tl);
1414
1415            let req_join_set_created = AppendRequest {
1416                created_at,
1417                event: ExecutionRequest::HistoryEvent {
1418                    event: HistoryEvent::JoinSetCreate {
1419                        join_set_id: join_set_id_direct.clone(),
1420                    },
1421                },
1422            };
1423            let params = Params::from_wasmtime(Arc::from(params));
1424            let req_child_exec = AppendRequest {
1425                created_at,
1426                event: ExecutionRequest::HistoryEvent {
1427                    event: HistoryEvent::JoinSetRequest {
1428                        join_set_id: join_set_id_direct.clone(),
1429                        request: JoinSetRequest::ChildExecutionRequest {
1430                            child_execution_id: child_execution_id.clone(),
1431                            target_ffqn: ffqn.clone(),
1432                            params: params.clone(),
1433                            result: Ok(()),
1434                        },
1435                    },
1436                },
1437            };
1438            let req_join_next = AppendRequest {
1439                created_at,
1440                event: ExecutionRequest::HistoryEvent {
1441                    event: HistoryEvent::JoinNext {
1442                        join_set_id: join_set_id_direct.clone(),
1443                        run_expires_at: created_at, // does not matter what the pending state is.
1444                        closing: false,
1445                        requested_ffqn: Some(ffqn.clone()), // only needed for workflows but added for consistency.
1446                    },
1447                },
1448            };
1449            let req_create_child = CreateRequest {
1450                created_at,
1451                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1452                ffqn: ffqn.clone(),
1453                params,
1454                parent: Some((ExecutionId::TopLevel(self.execution_id), join_set_id_direct)),
1455                metadata: ExecutionMetadata::from_parent_span(&self.component_logger.span),
1456                scheduled_at: created_at,
1457                component_id: child_component_id.clone(),
1458                deployment_id: self.deployment_id,
1459                scheduled_by: None,
1460                paused: false,
1461            };
1462            let db_connection = self.db_pool.connection().await?;
1463            let appended = vec![req_join_set_created, req_child_exec, req_join_next];
1464            let expected_next_version = Version(version.0 + 3);
1465            let backtrace_info = wasm_backtrace.map(|wasm_backtrace| BacktraceInfo {
1466                execution_id: ExecutionId::TopLevel(self.execution_id),
1467                component_id: self.component_id.clone(),
1468                version_min_including: version.clone(),
1469                version_max_excluding: expected_next_version.clone(),
1470                wasm_backtrace,
1471            });
1472
1473            let version = db_connection
1474                .append_batch_create_new_execution(
1475                    created_at,
1476                    appended,
1477                    ExecutionId::TopLevel(self.execution_id),
1478                    version,
1479                    vec![req_create_child],
1480                    backtrace_info.into_iter().collect(),
1481                )
1482                .await?;
1483            assert_eq!(version, expected_next_version); // Expected for backtrace's version_max_excluding
1484            self.version = Some(version);
1485
1486            let res = Self::wait_for_finished_result(
1487                self.subscription_interruption,
1488                &self.sleep,
1489                db_connection.as_ref(),
1490                &ExecutionId::Derived(child_execution_id),
1491                &self.connection_drop_watcher,
1492                &self.server_termination_watcher,
1493            )
1494            .await?;
1495            results[0] = res.into_wast_val(move || return_type_tl).as_val();
1496
1497            trace!(?results, "call_imported_fn finish");
1498        }
1499        Ok(())
1500    }
1501
1502    async fn wait_for_finished_result(
1503        subscription_interruption: Option<Duration>,
1504        sleep: &Arc<dyn Sleep>,
1505        db_connection: &dyn DbConnection,
1506        execution_id: &ExecutionId,
1507        connection_drop_watcher: &watch::Receiver<()>,
1508        server_termination_watcher: &watch::Receiver<()>,
1509    ) -> Result<SupportedFunctionReturnValue, WebhookEndpointFunctionError> {
1510        let timeout_factory = move || {
1511            let subscription_interruption = subscription_interruption.unwrap_or(Duration::MAX);
1512            let sleep = sleep.clone();
1513            let mut connection_drop_watcher = connection_drop_watcher.clone();
1514            let mut server_termination_watcher = server_termination_watcher.clone();
1515            Box::pin(async move {
1516                select! {
1517                    () = sleep.sleep(subscription_interruption) => TimeoutOutcome::Timeout,
1518                    _ = connection_drop_watcher.changed() => TimeoutOutcome::Cancel,
1519                    _ = server_termination_watcher.changed() => TimeoutOutcome::Cancel,
1520                }
1521            })
1522        };
1523
1524        loop {
1525            let timeout = timeout_factory();
1526            let res = db_connection
1527                .wait_for_finished_result(execution_id, Some(timeout))
1528                .await;
1529            match res {
1530                Ok(ok) => {
1531                    trace!("Finished ok");
1532                    return Ok(ok);
1533                }
1534                Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout)) => {
1535                    trace!("Timeout triggers resubscribing");
1536                }
1537                Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Cancel)) => {
1538                    debug!("Connection closed, not waiting for result");
1539                    return Err(WebhookEndpointFunctionError::ConnectionClosed);
1540                }
1541                Err(DbErrorReadWithTimeout::DbErrorRead(err)) => {
1542                    warn!("Database error: {err:?}");
1543                    return Err(WebhookEndpointFunctionError::from(DbErrorWrite::from(err)));
1544                }
1545            }
1546        }
1547    }
1548
1549    fn add_to_linker(linker: &mut Linker<WebhookEndpointCtx>) -> Result<(), WasmFileError> {
1550        // link obelisk:log
1551        log_activities::obelisk::log::log::add_to_linker::<_, WebhookEndpointCtx>(linker, |x| x)
1552            .map_err(|err| WasmFileError::linking_error("cannot link log activities", err))?;
1553        // link obelisk:types
1554        types::obelisk::types::execution::add_to_linker::<_, WebhookEndpointCtx>(linker, |x| x)
1555            .map_err(|err| WasmFileError::linking_error("cannot link obelisk:types", err))?;
1556        // link obelisk:webhook/webhook-support
1557        types::obelisk::webhook::webhook_support::add_to_linker::<_, WebhookEndpointCtx>(
1558            linker,
1559            |x| x,
1560        )
1561        .map_err(|err| WasmFileError::linking_error("cannot link obelisk:webhook", err))?;
1562        Ok(())
1563    }
1564
1565    #[must_use]
1566    #[expect(clippy::too_many_arguments)]
1567    fn new<'a>(
1568        deployment_id: DeploymentId,
1569        config: &Arc<WebhookEndpointConfig>,
1570        engine: &Engine,
1571        clock_fn: Box<dyn ClockFn>,
1572        sleep: Arc<dyn Sleep>,
1573        db_pool: Arc<dyn DbPool>,
1574        fn_registry: Arc<dyn FunctionRegistry>,
1575        params: impl Iterator<Item = (&'a str, &'a str)>,
1576        execution_id: ExecutionIdTopLevel,
1577        request_span: Span,
1578        connection_drop_watcher: watch::Receiver<()>,
1579        server_termination_watcher: watch::Receiver<()>,
1580        stdout: Option<StdOutput>,
1581        stderr: Option<StdOutput>,
1582        run_id: RunId,
1583        logs_storage_config: Option<LogStrageConfig>,
1584    ) -> Store<WebhookEndpointCtx> {
1585        let mut wasi_ctx = WasiCtxBuilder::new();
1586        if let Some(stdout) = stdout {
1587            let stdout = LogStream::new(
1588                format!(
1589                    "[{component_id} {execution_id} stdout]",
1590                    component_id = config.component_id
1591                ),
1592                stdout,
1593            );
1594            wasi_ctx.stdout(stdout);
1595        }
1596        if let Some(stderr) = stderr {
1597            let stderr = LogStream::new(
1598                format!(
1599                    "[{component_id} {execution_id} stderr]",
1600                    component_id = config.component_id
1601                ),
1602                stderr,
1603            );
1604            wasi_ctx.stderr(stderr);
1605        }
1606        for env_var in config.env_vars.as_ref() {
1607            wasi_ctx.env(&env_var.key, &env_var.val);
1608        }
1609        if let Some(js_config) = &config.js_config {
1610            wasi_ctx.env("__OBELISK_JS_SOURCE__", &js_config.source);
1611            wasi_ctx.env("__OBELISK_JS_FILE_NAME__", &js_config.file_name);
1612            if !js_config.resolved_imports.is_empty() {
1613                let imports_json = serde_json::to_string(&js_config.resolved_imports)
1614                    .expect("resolved imports must be serializable");
1615                wasi_ctx.env("__OBELISK_RESOLVED_IMPORTS__", &imports_json);
1616            }
1617        }
1618
1619        // Generate fresh placeholders for this execution run
1620        let http_policy =
1621            crate::policy_builder::build_http_policy(&config.allowed_hosts, &mut wasi_ctx);
1622
1623        for (key, val) in params {
1624            wasi_ctx.env(key, val);
1625        }
1626        let wasi_ctx = wasi_ctx.build();
1627        let component_logger = ComponentLogger {
1628            span: request_span,
1629            execution_id: ExecutionId::TopLevel(execution_id),
1630            run_id,
1631            logs_storage_config,
1632        };
1633        // All child executions are part of the same join set.
1634        let ctx = WebhookEndpointCtx {
1635            clock_fn: clock_fn.clone_box(),
1636            sleep,
1637            db_pool,
1638            fn_registry,
1639            table: ResourceTable::new(),
1640            wasi_ctx,
1641            http_ctx: WasiHttpCtx::new(),
1642            version: None,
1643            component_id: config.component_id.clone(),
1644            deployment_id,
1645            next_join_set_idx: JOIN_SET_START_IDX,
1646            execution_id,
1647            component_logger: component_logger.clone(),
1648            subscription_interruption: config.subscription_interruption,
1649            connection_drop_watcher,
1650            server_termination_watcher,
1651            http_hooks: HttpHooks {
1652                clock_fn,
1653                http_client_traces: HttpClientTracesContainer::default(),
1654                http_policy,
1655                component_logger,
1656                config_section_hint: config.config_section_hint,
1657            },
1658        };
1659        let mut store = Store::new(engine, ctx);
1660
1661        // Set fuel.
1662        if let Some(fuel) = config.fuel {
1663            store
1664                .set_fuel(fuel)
1665                .expect("engine must have `consume_fuel` enabled");
1666        }
1667
1668        // Configure epoch callback before running the initialization to avoid interruption
1669        store.epoch_deadline_callback(|_store_ctx| Ok(UpdateDeadline::Yield(1)));
1670        store
1671    }
1672
1673    async fn close(
1674        self,
1675        original_result: wasmtime::Result<()>,
1676        assigned_fuel: Option<u64>,
1677    ) -> wasmtime::Result<()> {
1678        #[derive(Debug, thiserror::Error)]
1679        #[error("webhook {trap_kind}: {reason}")]
1680        struct WebhookTrap {
1681            reason: String,
1682            trap_kind: TrapKind,
1683            detail: Option<String>,
1684        }
1685
1686        let result = match &original_result {
1687            Ok(()) => SUPPORTED_RETURN_VALUE_OK_EMPTY,
1688            Err(err) => {
1689                let err = if let Some(trap) = err
1690                    .source()
1691                    .and_then(|source| source.downcast_ref::<wasmtime::Trap>())
1692                {
1693                    if *trap == wasmtime::Trap::OutOfFuel {
1694                        WebhookTrap {
1695                            reason: format!(
1696                                "total fuel consumed: {}",
1697                                assigned_fuel
1698                                    .expect("must have been set as it was the reason of trap")
1699                            ),
1700                            detail: None,
1701                            trap_kind: TrapKind::OutOfFuel,
1702                        }
1703                    } else {
1704                        WebhookTrap {
1705                            reason: trap.to_string(),
1706                            detail: Some(format!("{err:?}")),
1707                            trap_kind: TrapKind::Trap,
1708                        }
1709                    }
1710                } else {
1711                    WebhookTrap {
1712                        reason: err.to_string(),
1713                        trap_kind: TrapKind::HostFunctionError,
1714                        detail: Some(format!("{err:?}")),
1715                    }
1716                };
1717
1718                SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure {
1719                    reason: Some(err.to_string()),
1720                    kind: ExecutionFailureKind::Uncategorized,
1721                    detail: err.detail,
1722                })
1723            }
1724        };
1725        if let Some(version) = self.version {
1726            let http_client_traces = Some(
1727                self.http_hooks
1728                    .http_client_traces
1729                    .into_iter()
1730                    .map(|(req, mut resp)| HttpClientTrace {
1731                        req,
1732                        resp: resp.try_recv().ok(),
1733                    })
1734                    .collect(),
1735            );
1736            self.db_pool
1737                .connection()
1738                .await?
1739                .append(
1740                    ExecutionId::TopLevel(self.execution_id),
1741                    version,
1742                    AppendRequest {
1743                        created_at: self.clock_fn.now(),
1744                        event: ExecutionRequest::Finished {
1745                            retval: result,
1746                            http_client_traces,
1747                        },
1748                    },
1749                )
1750                .await?;
1751        }
1752        original_result
1753    }
1754}
1755
1756impl log_activities::obelisk::log::log::Host for WebhookEndpointCtx {
1757    fn trace(&mut self, message: String) {
1758        self.component_logger.log(LogLevel::Trace, message);
1759    }
1760
1761    fn debug(&mut self, message: String) {
1762        self.component_logger.log(LogLevel::Debug, message);
1763    }
1764
1765    fn info(&mut self, message: String) {
1766        self.component_logger.log(LogLevel::Info, message);
1767    }
1768
1769    fn warn(&mut self, message: String) {
1770        self.component_logger.log(LogLevel::Warn, message);
1771    }
1772
1773    fn error(&mut self, message: String) {
1774        self.component_logger.log(LogLevel::Error, message);
1775    }
1776}
1777
1778impl WasiView for WebhookEndpointCtx {
1779    fn ctx(&mut self) -> WasiCtxView<'_> {
1780        WasiCtxView {
1781            ctx: &mut self.wasi_ctx,
1782            table: &mut self.table,
1783        }
1784    }
1785}
1786impl IoView for WebhookEndpointCtx {
1787    fn table(&mut self) -> &mut ResourceTable {
1788        &mut self.table
1789    }
1790}
1791impl WasiHttpView for WebhookEndpointCtx {
1792    fn http(&mut self) -> WasiHttpCtxView<'_> {
1793        WasiHttpCtxView {
1794            ctx: &mut self.http_ctx,
1795            table: &mut self.table,
1796            hooks: &mut self.http_hooks,
1797        }
1798    }
1799}
1800
1801struct RequestHandler {
1802    deployment_id: DeploymentId,
1803    engine: Arc<Engine>,
1804    clock_fn: Box<dyn ClockFn>,
1805    sleep: Arc<dyn Sleep>,
1806    db_pool: Arc<dyn DbPool>,
1807    fn_registry: Arc<dyn FunctionRegistry>,
1808    execution_id: ExecutionIdTopLevel,
1809    router: Arc<MethodAwareRouter<WebhookEndpointInstanceLinked>>,
1810    connection_drop_watcher: watch::Receiver<()>,
1811    server_termination_watcher: watch::Receiver<()>,
1812    log_forwarder_sender: mpsc::Sender<LogInfoAppendRow>,
1813}
1814
1815fn respond(body: &str, status_code: StatusCode) -> hyper::Response<HyperOutgoingBody> {
1816    let body = UnsyncBoxBody::new(http_body_util::BodyExt::map_err(
1817        http_body_util::Full::new(Bytes::copy_from_slice(body.as_bytes())),
1818        |_| unreachable!(),
1819    ));
1820    hyper::Response::builder()
1821        .status(status_code)
1822        .body(body)
1823        .unwrap()
1824}
1825
1826impl RequestHandler {
1827    #[instrument(skip_all, name="incoming webhook request", fields(execution_id = %self.execution_id))]
1828    async fn handle_request(
1829        self,
1830        req: hyper::Request<hyper::body::Incoming>,
1831        max_inflight_requests: Option<Arc<tokio::sync::Semaphore>>,
1832    ) -> Result<hyper::Response<HyperOutgoingBody>, hyper::Error> {
1833        let http_request_guard = if let Some(http_request_semaphore) = &max_inflight_requests {
1834            http_request_semaphore.clone().try_acquire_owned().map(Some)
1835        } else {
1836            Ok(None)
1837        };
1838        let Ok(http_request_guard) = http_request_guard else {
1839            debug!(method = %req.method(), uri = %req.uri(), "Too many requests");
1840            return Ok::<_, hyper::Error>(respond("Out of permits", StatusCode::TOO_MANY_REQUESTS));
1841        };
1842
1843        let res = self
1844            .handle_request_inner(req, http_request_guard, Span::current())
1845            .await;
1846        match res {
1847            Ok(body) => Ok(body),
1848            Err(err) => {
1849                debug!("{err:?}");
1850                Ok(match err {
1851                    HandleRequestError::IncomingRequestError(err) => respond(
1852                        &format!("Incoming request error: {err}"),
1853                        StatusCode::BAD_REQUEST,
1854                    ),
1855                    HandleRequestError::ResponseCreationError(err) => respond(
1856                        &format!("Cannot create response: {err}"),
1857                        StatusCode::INTERNAL_SERVER_ERROR,
1858                    ),
1859                    HandleRequestError::InstantiationError(err) => respond(
1860                        &format!("Cannot instantiate: {err}"),
1861                        StatusCode::SERVICE_UNAVAILABLE,
1862                    ),
1863                    HandleRequestError::ErrorCode(code) => respond(
1864                        &format!("Error code: {code}"),
1865                        StatusCode::INTERNAL_SERVER_ERROR,
1866                    ),
1867                    HandleRequestError::ExecutionError(_) => {
1868                        respond("Component Error", StatusCode::INTERNAL_SERVER_ERROR)
1869                    }
1870                    HandleRequestError::RouteNotFound => {
1871                        respond("Route not found", StatusCode::NOT_FOUND)
1872                    }
1873                    HandleRequestError::Timeout => respond("Timeout", StatusCode::REQUEST_TIMEOUT),
1874                    HandleRequestError::InstanceLimitReached => {
1875                        respond("Instance limit reached", StatusCode::SERVICE_UNAVAILABLE)
1876                    }
1877                })
1878            }
1879        }
1880    }
1881
1882    async fn handle_request_inner(
1883        self,
1884        req: hyper::Request<hyper::body::Incoming>,
1885        http_request_guard: Option<OwnedSemaphorePermit>,
1886        request_span: Span,
1887    ) -> Result<hyper::Response<HyperOutgoingBody>, HandleRequestError> {
1888        #[derive(Debug, thiserror::Error)]
1889        #[error("timeout")]
1890        struct TimeoutError;
1891
1892        if let Some(instance_match) = self.router.find(req.method(), req.uri()) {
1893            let found_instance = instance_match.handler();
1894            let found_instance = found_instance.build(&self.log_forwarder_sender);
1895            let run_id = RunId::generate();
1896            let stdout = found_instance.stdout.as_ref().map(|stdoutput| {
1897                stdoutput.build(&ExecutionId::TopLevel(self.execution_id), run_id)
1898            });
1899            let stderr = found_instance.stderr.as_ref().map(|stdoutput| {
1900                stdoutput.build(&ExecutionId::TopLevel(self.execution_id), run_id)
1901            });
1902            let (sender, receiver) = tokio::sync::oneshot::channel();
1903            let mut store = WebhookEndpointCtx::new(
1904                self.deployment_id,
1905                &found_instance.config,
1906                &self.engine,
1907                self.clock_fn,
1908                self.sleep,
1909                self.db_pool,
1910                self.fn_registry,
1911                instance_match.params().iter(),
1912                self.execution_id,
1913                request_span.clone(),
1914                self.connection_drop_watcher,
1915                self.server_termination_watcher,
1916                stdout,
1917                stderr,
1918                run_id,
1919                found_instance.logs_storage_config.clone(),
1920            );
1921            let req = store
1922                .data_mut()
1923                .http()
1924                .new_incoming_request(Scheme::Http, req)
1925                .map_err(|err| HandleRequestError::IncomingRequestError(err.into()))?;
1926            let out = store
1927                .data_mut()
1928                .http()
1929                .new_response_outparam(sender)
1930                .map_err(|err| HandleRequestError::ResponseCreationError(err.into()))?;
1931            let proxy = found_instance
1932                .proxy_pre
1933                .instantiate_async(&mut store)
1934                .await
1935                .map_err(|err| HandleRequestError::InstantiationError(err.into()))?;
1936
1937            let task = tokio::task::spawn({
1938                let assigned_fuel = found_instance.config.fuel;
1939                async move {
1940                    let _http_request_guard = http_request_guard;
1941                    let result = proxy
1942                        .wasi_http_incoming_handler()
1943                        .call_handle(&mut store, req, out)
1944                        .await
1945                        .inspect_err(|err| debug!("Webhook instance returned error: {err:?}"));
1946                    let ctx = store.into_data();
1947                    ctx.close(result, assigned_fuel).await
1948                }
1949                .instrument(request_span)
1950            });
1951            match receiver.await {
1952                Ok(Ok(resp)) => {
1953                    trace!("Streaming the response");
1954                    Ok(resp)
1955                }
1956                Ok(Err(err)) => {
1957                    debug!("Webhook instance sent error code {err:?}");
1958                    Err(HandleRequestError::ErrorCode(err))
1959                }
1960                Err(_recv_err) => {
1961                    // An error in the receiver (`RecvError`) only indicates that the
1962                    // task exited before a response was sent (i.e., the sender was
1963                    // dropped); it does not describe the underlying cause of failure.
1964                    // Instead we retrieve and propagate the error from inside the task
1965                    // which should more clearly tell the user what went wrong. Note
1966                    // that we assume the task has already exited at this point so the
1967                    // `await` should resolve immediately.
1968                    let err = match task.await {
1969                        Ok(r) => {
1970                            r.expect_err("if the receiver has an error, the task must have failed")
1971                        } //
1972                        Err(e) => e.into(), // e.g. Panic
1973                    };
1974                    if err.downcast_ref::<TimeoutError>().is_some() {
1975                        Err(HandleRequestError::Timeout)
1976                    } else {
1977                        info!("Webhook task ended with ExecutionError - {err:?}");
1978                        Err(HandleRequestError::ExecutionError(err.into()))
1979                    }
1980                }
1981            }
1982        } else {
1983            Err(HandleRequestError::RouteNotFound)
1984        }
1985    }
1986}
1987#[derive(Debug, thiserror::Error)]
1988pub enum HandleRequestError {
1989    #[error("incoming request error: {0}")]
1990    IncomingRequestError(StdError),
1991    #[error("response creation error: {0}")]
1992    ResponseCreationError(StdError),
1993    #[error("instantiation error: {0}")]
1994    InstantiationError(StdError),
1995    #[error("error code: {0}")]
1996    ErrorCode(wasmtime_wasi_http::p2::bindings::http::types::ErrorCode),
1997    #[error("execution error: {0}")]
1998    ExecutionError(StdError),
1999    #[error("route not found")]
2000    RouteNotFound,
2001    #[error("instance limit reached")]
2002    InstanceLimitReached,
2003    #[error("timeout")]
2004    Timeout,
2005}
2006
2007fn execution_id_into_val(execution_id: &ExecutionId) -> Val {
2008    Val::Record(vec![(
2009        "id".to_string(),
2010        Val::String(execution_id.to_string()),
2011    )])
2012}
2013
2014#[cfg(test)]
2015pub(crate) mod tests {
2016    use crate::{
2017        RunnableComponent,
2018        engines::{EngineConfig, Engines},
2019    };
2020
2021    use super::MethodAwareRouter;
2022    use assert_matches::assert_matches;
2023    use concepts::ComponentType;
2024    use hyper::{Method, Uri};
2025
2026    pub(crate) fn compile_webhook(wasm_path: &str) -> RunnableComponent {
2027        let engine = Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2028        RunnableComponent::new(wasm_path, &engine, ComponentType::WebhookEndpoint).unwrap()
2029    }
2030
2031    pub(crate) mod fibo {
2032        use super::*;
2033        use crate::activity::activity_worker::test::compile_activity;
2034        use crate::activity::activity_worker::tests::new_activity_fibo;
2035        use crate::activity::cancel_registry::CancelRegistry;
2036        use crate::engines::{EngineConfig, Engines};
2037        use crate::http_hooks::ConfigSectionHint;
2038        use crate::http_request_policy::{AllowedHostConfig, HostPattern, MethodsPattern};
2039        use crate::std_output_stream::StdOutputConfig;
2040        use crate::testing_fn_registry::TestingFnRegistry;
2041        use crate::webhook::webhook_trigger::{
2042            self, WebhookEndpointCompiled, WebhookEndpointConfig, WebhookServerError,
2043            WebhookServerState,
2044        };
2045        use crate::workflow::workflow_worker::JoinNextBlockingStrategy;
2046        use crate::workflow::workflow_worker::test::compile_workflow;
2047        use crate::workflow::workflow_worker::tests::{FIBOA_WORKFLOW_FFQN, new_workflow_fibo};
2048        use concepts::component_id::ComponentDigest;
2049        use concepts::prefixed_ulid::{DEPLOYMENT_ID_DUMMY, RunId};
2050        use concepts::storage::DbPoolCloseable;
2051        use concepts::time::ClockFn;
2052        use concepts::time::TokioSleep;
2053        use concepts::{ComponentId, ComponentType, Params, StrVariant};
2054        use concepts::{ExecutionId, storage::DbPool};
2055        use db_tests::{Database, DbGuard, DbPoolCloseableWrapper};
2056        use executor::executor::{ExecTask, LockingStrategy};
2057        use rstest::rstest;
2058        use serde_json::json;
2059        use std::net::SocketAddr;
2060        use std::str::FromStr;
2061        use std::sync::Arc;
2062        use std::time::Duration;
2063        use test_db_macro::expand_enum_database;
2064        use test_utils::sim_clock::SimClock;
2065        use tokio::net::TcpListener;
2066        use tokio::sync::{mpsc, watch};
2067        use tracing::info;
2068        use utils::sha256sum::calculate_sha256_file;
2069
2070        struct SetUpFiboWebhook {
2071            #[expect(dead_code)]
2072            set: tokio::task::JoinSet<Result<(), WebhookServerError>>,
2073            #[expect(dead_code)]
2074            guard: DbGuard,
2075            db_pool: Arc<dyn DbPool>,
2076            server_addr: SocketAddr,
2077            activity_exec: ExecTask,
2078            workflow_exec: ExecTask,
2079            sim_clock: SimClock,
2080            db_close: DbPoolCloseableWrapper,
2081            #[expect(dead_code)]
2082            server_termination_sender: watch::Sender<()>,
2083            #[expect(dead_code)]
2084            wh_server_state_sender: watch::Sender<Arc<WebhookServerState>>,
2085        }
2086
2087        impl SetUpFiboWebhook {
2088            async fn new(
2089                db: db_tests::Database,
2090                locking_strategy: LockingStrategy,
2091            ) -> SetUpFiboWebhook {
2092                let addr = SocketAddr::from(([127, 0, 0, 1], 0));
2093                let sim_clock = SimClock::default();
2094                let (guard, db_pool, db_close) = db.set_up().await;
2095                let activity_exec = new_activity_fibo(
2096                    db_pool.clone(),
2097                    sim_clock.clone_box(),
2098                    TokioSleep,
2099                    locking_strategy,
2100                )
2101                .await;
2102
2103                let (workflow_runnable, workflow_component_id) = compile_workflow(
2104                    test_programs_fibo_workflow_builder::TEST_PROGRAMS_FIBO_WORKFLOW,
2105                )
2106                .await;
2107
2108                let fn_registry = TestingFnRegistry::new_from_components(vec![
2109                    compile_activity(
2110                        test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
2111                    )
2112                    .await,
2113                    (workflow_runnable, workflow_component_id),
2114                ]);
2115                let cancel_registry = CancelRegistry::new();
2116                let engine =
2117                    Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2118                let workflow_exec = new_workflow_fibo(
2119                    db_pool.clone(),
2120                    sim_clock.clone_box(),
2121                    JoinNextBlockingStrategy::Interrupt,
2122                    &fn_registry,
2123                    cancel_registry,
2124                    locking_strategy,
2125                )
2126                .await;
2127                let (db_forwarder_sender, _) = mpsc::channel(1);
2128                let wasm_file = test_programs_fibo_webhook_builder::TEST_PROGRAMS_FIBO_WEBHOOK;
2129                let router = {
2130                    let runnable_component =
2131                        RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2132                            .unwrap();
2133                    let instance = WebhookEndpointCompiled::new(
2134                        WebhookEndpointConfig {
2135                            component_id: ComponentId::new(
2136                                ComponentType::WebhookEndpoint,
2137                                StrVariant::empty(),
2138                                ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
2139                            )
2140                            .unwrap(),
2141                            forward_stdout: Some(StdOutputConfig::Stdout),
2142                            forward_stderr: Some(StdOutputConfig::Stdout),
2143                            env_vars: Arc::from([]),
2144                            fuel: None,
2145                            backtrace_persist: false,
2146                            subscription_interruption: None,
2147                            logs_store_min_level: None,
2148                            allowed_hosts: Arc::from([]),
2149                            js_config: None,
2150                            config_section_hint: ConfigSectionHint::WebhookEndpointWasm,
2151                        },
2152                        runnable_component,
2153                    )
2154                    .unwrap()
2155                    .link(&engine, fn_registry.as_ref())
2156                    .unwrap();
2157                    let mut router = MethodAwareRouter::default();
2158                    router.add(Some(Method::GET), "/fibo/:N/:ITERATIONS", instance);
2159                    router
2160                };
2161                let tcp_listener = TcpListener::bind(addr).await.unwrap();
2162                let server_addr = tcp_listener.local_addr().unwrap();
2163                info!("Listening on port {}", server_addr.port());
2164                let (server_termination_sender, server_termination_watcher) = watch::channel(());
2165                let initial_state = Arc::new(webhook_trigger::WebhookServerState {
2166                    deployment_id: DEPLOYMENT_ID_DUMMY,
2167                    router: Arc::new(router),
2168                    fn_registry,
2169                });
2170                let (wh_server_state_sender, wh_server_state_watcher) =
2171                    watch::channel(initial_state);
2172                let mut set = tokio::task::JoinSet::new();
2173                set.spawn(webhook_trigger::server(
2174                    "test".to_string(),
2175                    tcp_listener,
2176                    engine,
2177                    wh_server_state_watcher,
2178                    db_forwarder_sender,
2179                    db_pool.clone(),
2180                    sim_clock.clone_box(),
2181                    Arc::new(TokioSleep),
2182                    None,
2183                    server_termination_watcher,
2184                ));
2185                SetUpFiboWebhook {
2186                    set,
2187                    guard,
2188                    db_pool,
2189                    server_addr,
2190                    activity_exec,
2191                    workflow_exec,
2192                    sim_clock,
2193                    db_close,
2194                    server_termination_sender,
2195                    wh_server_state_sender,
2196                }
2197            }
2198
2199            async fn fibo_fetch(
2200                server_addr: &str,
2201                n: u8,
2202                iterations: u32,
2203                expected_status_code: u16,
2204            ) -> String {
2205                let resp = reqwest::get(format!("http://{server_addr}/fibo/{n}/{iterations}"))
2206                    .await
2207                    .unwrap();
2208                assert_eq!(resp.status().as_u16(), expected_status_code);
2209                resp.text().await.unwrap()
2210            }
2211
2212            async fn close(self) {
2213                self.db_close.close().await;
2214            }
2215        }
2216
2217        #[rstest]
2218        #[tokio::test]
2219        async fn hardcoded_result_should_work(
2220            #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2221            locking_strategy: LockingStrategy,
2222        ) {
2223            test_utils::set_up();
2224            let fibo_webhook_harness =
2225                SetUpFiboWebhook::new(Database::Sqlite, locking_strategy).await;
2226            let server_addr = fibo_webhook_harness.server_addr.to_string();
2227            assert_eq!(
2228                "fiboa(1, 0) = hardcoded: 1",
2229                SetUpFiboWebhook::fibo_fetch(&server_addr, 1, 0, 200).await
2230            );
2231        }
2232
2233        #[expand_enum_database]
2234        #[rstest]
2235        #[tokio::test]
2236        async fn direct_call_should_work(
2237            db: Database,
2238            #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2239            locking_strategy: LockingStrategy,
2240        ) {
2241            test_utils::set_up();
2242            let fibo_webhook_harness = SetUpFiboWebhook::new(db, locking_strategy).await;
2243            let server_addr = fibo_webhook_harness.server_addr.to_string();
2244            let fetch_task =
2245                tokio::spawn(
2246                    async move { SetUpFiboWebhook::fibo_fetch(&server_addr, 2, 1, 200).await },
2247                );
2248
2249            let now = fibo_webhook_harness.sim_clock.now();
2250            while fibo_webhook_harness
2251                .workflow_exec
2252                .tick_test_await(now, RunId::generate())
2253                .await
2254                .is_empty()
2255            {
2256                tokio::time::sleep(Duration::from_millis(100)).await;
2257            }
2258            // At this point the workflow was started successfuly.
2259            assert_eq!(
2260                1,
2261                fibo_webhook_harness
2262                    .activity_exec
2263                    .tick_test_await(now, RunId::generate())
2264                    .await
2265                    .len()
2266            );
2267            // finish workflow
2268            assert_eq!(
2269                1,
2270                fibo_webhook_harness
2271                    .workflow_exec
2272                    .tick_test_await(now, RunId::generate())
2273                    .await
2274                    .len()
2275            );
2276            let res = fetch_task.await.unwrap();
2277            assert_eq!("fiboa(2, 1) = direct call: 1", res);
2278        }
2279
2280        #[expand_enum_database]
2281        #[rstest]
2282        #[tokio::test]
2283        async fn scheduling_should_work(
2284            db: db_tests::Database,
2285            #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2286            locking_strategy: LockingStrategy,
2287        ) {
2288            test_utils::set_up();
2289            let fibo_webhook_harness = SetUpFiboWebhook::new(db, locking_strategy).await;
2290            let server_addr = fibo_webhook_harness.server_addr.to_string();
2291            let n = 10;
2292            let iterations = 1;
2293            let resp = SetUpFiboWebhook::fibo_fetch(&server_addr, n, iterations, 200).await;
2294
2295            let execution_id = resp
2296                .strip_prefix(&format!("fiboa({n}, {iterations}) = scheduled: "))
2297                .unwrap();
2298            let execution_id = ExecutionId::from_str(execution_id).unwrap();
2299            let conn = fibo_webhook_harness.db_pool.connection().await.unwrap();
2300            let create_req = conn.get_create_request(&execution_id).await.unwrap();
2301            assert_eq!(FIBOA_WORKFLOW_FFQN, create_req.ffqn);
2302            let expected_params = Params::from_json_values_test(vec![json!(10), json!(1)]);
2303            assert_eq!(
2304                serde_json::to_string(&expected_params).unwrap(),
2305                serde_json::to_string(&create_req.params).unwrap()
2306            );
2307        }
2308
2309        #[rstest]
2310        #[tokio::test]
2311        async fn test_routing_error_handling(
2312            #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2313            locking_strategy: LockingStrategy,
2314        ) {
2315            test_utils::set_up();
2316            let fibo_webhook_harness =
2317                SetUpFiboWebhook::new(Database::Sqlite, locking_strategy).await;
2318            // Check wrong URL
2319            let resp = reqwest::get(format!(
2320                "http://{}/unknown",
2321                &fibo_webhook_harness.server_addr
2322            ))
2323            .await
2324            .unwrap();
2325            assert_eq!(resp.status().as_u16(), 404);
2326            assert_eq!("Route not found", resp.text().await.unwrap());
2327            // Check panicking inside WASM before response is streamed
2328            let resp = reqwest::get(format!(
2329                "http://{}/fibo/0/1",
2330                &fibo_webhook_harness.server_addr
2331            ))
2332            .await
2333            .unwrap();
2334            assert_eq!(resp.status().as_u16(), 500);
2335            assert_eq!("Component Error", resp.text().await.unwrap());
2336            fibo_webhook_harness.close().await;
2337        }
2338
2339        #[expand_enum_database]
2340        #[rstest]
2341        #[tokio::test]
2342        async fn http_client_traces_should_be_captured(
2343            db: db_tests::Database,
2344            #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2345            locking_strategy: LockingStrategy,
2346        ) {
2347            use concepts::storage::ExecutionRequest;
2348            use concepts::storage::http_client_trace::{
2349                HttpClientTrace, RequestTrace, ResponseTrace,
2350            };
2351            use wiremock::{
2352                Mock, MockServer, ResponseTemplate,
2353                matchers::{method, path},
2354            };
2355            const BODY: &str = "webhook-http-trace-test-body";
2356            test_utils::set_up();
2357            let sim_clock = SimClock::default();
2358            let (db_guard, db_pool, db_close) = db.set_up().await;
2359
2360            // Set up mock HTTP server
2361            let mock_listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2362            let mock_address = mock_listener.local_addr().unwrap();
2363            let mock_allowed_host = format!("http://127.0.0.1:{port}", port = mock_address.port());
2364            let mock_uri = format!("{mock_allowed_host}/");
2365            let mock_server = MockServer::builder().listener(mock_listener).start().await;
2366            Mock::given(method("GET"))
2367                .and(path("/"))
2368                .respond_with(ResponseTemplate::new(200).set_body_string(BODY))
2369                .expect(1)
2370                .mount(&mock_server)
2371                .await;
2372
2373            // Set up fibo workflow/activity workers (needed for schedule call)
2374            let activity_exec = crate::activity::activity_worker::tests::new_activity_fibo(
2375                db_pool.clone(),
2376                sim_clock.clone_box(),
2377                TokioSleep,
2378                locking_strategy,
2379            )
2380            .await;
2381
2382            let (workflow_runnable, workflow_component_id) =
2383                compile_workflow(test_programs_fibo_workflow_builder::TEST_PROGRAMS_FIBO_WORKFLOW)
2384                    .await;
2385
2386            let fn_registry =
2387                crate::testing_fn_registry::TestingFnRegistry::new_from_components(vec![
2388                    crate::activity::activity_worker::test::compile_activity(
2389                        test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
2390                    )
2391                    .await,
2392                    (workflow_runnable, workflow_component_id),
2393                ]);
2394
2395            let engine = crate::engines::Engines::get_webhook_engine(
2396                crate::engines::EngineConfig::on_demand_testing(),
2397            )
2398            .unwrap();
2399
2400            // Build the HTTP GET webhook
2401            let (db_forwarder_sender, _) = mpsc::channel(1);
2402            let wasm_file = test_programs_http_get_webhook_builder::TEST_PROGRAMS_HTTP_GET_WEBHOOK;
2403            let router = {
2404                let runnable_component =
2405                    RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2406                        .unwrap();
2407                let instance = WebhookEndpointCompiled::new(
2408                    WebhookEndpointConfig {
2409                        component_id: ComponentId::new(
2410                            ComponentType::WebhookEndpoint,
2411                            StrVariant::empty(),
2412                            concepts::component_id::ComponentDigest(
2413                                utils::sha256sum::calculate_sha256_file(wasm_file)
2414                                    .await
2415                                    .unwrap()
2416                                    .0,
2417                            ),
2418                        )
2419                        .unwrap(),
2420                        forward_stdout: Some(StdOutputConfig::Stdout),
2421                        forward_stderr: Some(StdOutputConfig::Stdout),
2422                        env_vars: Arc::from([]),
2423                        fuel: None,
2424                        backtrace_persist: false,
2425                        subscription_interruption: None,
2426                        logs_store_min_level: None,
2427                        allowed_hosts: Arc::from(vec![AllowedHostConfig {
2428                            pattern: HostPattern::parse_with_methods(
2429                                &mock_allowed_host,
2430                                MethodsPattern::AllMethods,
2431                            )
2432                            .unwrap(),
2433                            secret_env_mappings: Vec::new(),
2434                            replace_in: hashbrown::HashSet::new(),
2435                        }]),
2436                        js_config: None,
2437                        config_section_hint: ConfigSectionHint::WebhookEndpointWasm,
2438                    },
2439                    runnable_component,
2440                )
2441                .unwrap()
2442                .link(&engine, fn_registry.as_ref())
2443                .unwrap();
2444                let mut router = MethodAwareRouter::default();
2445                router.add(Some(Method::GET), "/http-get/:PORT", instance);
2446                router
2447            };
2448
2449            let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2450            let server_addr = tcp_listener.local_addr().unwrap();
2451            info!("Listening on port {}", server_addr.port());
2452            let (_server_termination_sender, server_termination_watcher) = watch::channel(());
2453            let (_wh_server_state_sender, wh_server_state_watcher) =
2454                watch::channel(Arc::new(WebhookServerState {
2455                    deployment_id: DEPLOYMENT_ID_DUMMY,
2456                    router: Arc::new(router),
2457                    fn_registry,
2458                }));
2459            let mut set = tokio::task::JoinSet::new();
2460            set.spawn(webhook_trigger::server(
2461                "test".to_string(),
2462                tcp_listener,
2463                engine,
2464                wh_server_state_watcher,
2465                db_forwarder_sender,
2466                db_pool.clone(),
2467                sim_clock.clone_box(),
2468                Arc::new(TokioSleep),
2469                None,
2470                server_termination_watcher,
2471            ));
2472
2473            // Send request to webhook with mock server port as route param
2474            let mock_port = mock_address.port();
2475            let resp = reqwest::get(format!("http://{server_addr}/http-get/{mock_port}"))
2476                .await
2477                .unwrap();
2478            assert_eq!(resp.status().as_u16(), 200);
2479            let resp_body = resp.text().await.unwrap();
2480            let mut lines = resp_body.lines();
2481            assert_eq!(Some(BODY), lines.next());
2482            let child_execution_id_str = lines.next().expect("response must contain execution id");
2483            let child_execution_id = ExecutionId::from_str(child_execution_id_str).unwrap();
2484
2485            // Find the webhook execution via the child's scheduled_by
2486            let conn = db_pool.connection().await.unwrap();
2487            let create_req = conn.get_create_request(&child_execution_id).await.unwrap();
2488            let webhook_exec_id = create_req
2489                .scheduled_by
2490                .expect("child must have scheduled_by set");
2491
2492            // Poll until the Finished event is persisted
2493            let finished_event = loop {
2494                let exec_log = conn.get(&webhook_exec_id).await.unwrap();
2495                if let ExecutionRequest::Finished {
2496                    http_client_traces, ..
2497                } = &exec_log.last_event().event
2498                {
2499                    break http_client_traces.clone();
2500                }
2501                tokio::time::sleep(Duration::from_millis(10)).await;
2502            };
2503
2504            let http_client_traces = finished_event
2505                .as_ref()
2506                .expect("http_client_traces must be Some");
2507            assert_eq!(1, http_client_traces.len());
2508            let trace = &http_client_traces[0];
2509            assert_matches!(
2510                trace,
2511                HttpClientTrace {
2512                    req: RequestTrace {
2513                        method,
2514                        uri,
2515                        sent_at: _,
2516                    },
2517                    resp: Some(ResponseTrace {
2518                        status: Ok(200),
2519                        finished_at: _,
2520                    }),
2521                } => {
2522                    assert_eq!("GET", method);
2523                    assert_eq!(&mock_uri, uri);
2524                }
2525            );
2526
2527            drop(conn);
2528            drop(activity_exec);
2529            drop(set);
2530            drop(db_guard);
2531            db_close.close().await;
2532        }
2533
2534        #[tokio::test]
2535        async fn http_get_denied_host() {
2536            use wiremock::{
2537                Mock, MockServer, ResponseTemplate,
2538                matchers::{method, path},
2539            };
2540            test_utils::set_up();
2541            let sim_clock = SimClock::default();
2542            let (db_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
2543
2544            // Set up mock HTTP server that the webhook will try to call
2545            let mock_listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2546            let mock_address = mock_listener.local_addr().unwrap();
2547            let mock_server = MockServer::builder().listener(mock_listener).start().await;
2548            Mock::given(method("GET"))
2549                .and(path("/"))
2550                .respond_with(ResponseTemplate::new(200).set_body_string("should-not-reach"))
2551                .expect(0) // Should NOT be called since host is not allowed
2552                .mount(&mock_server)
2553                .await;
2554
2555            // Set up activity/workflow workers
2556            let activity_exec = crate::activity::activity_worker::tests::new_activity_fibo(
2557                db_pool.clone(),
2558                sim_clock.clone_box(),
2559                TokioSleep,
2560                LockingStrategy::ByComponentDigest,
2561            )
2562            .await;
2563
2564            let (workflow_runnable, workflow_component_id) =
2565                compile_workflow(test_programs_fibo_workflow_builder::TEST_PROGRAMS_FIBO_WORKFLOW)
2566                    .await;
2567
2568            let fn_registry =
2569                crate::testing_fn_registry::TestingFnRegistry::new_from_components(vec![
2570                    crate::activity::activity_worker::test::compile_activity(
2571                        test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
2572                    )
2573                    .await,
2574                    (workflow_runnable, workflow_component_id),
2575                ]);
2576
2577            let engine = crate::engines::Engines::get_webhook_engine(
2578                crate::engines::EngineConfig::on_demand_testing(),
2579            )
2580            .unwrap();
2581
2582            // Build the HTTP GET webhook with NO allowed hosts
2583            let (db_forwarder_sender, _) = mpsc::channel(1);
2584            let wasm_file = test_programs_http_get_webhook_builder::TEST_PROGRAMS_HTTP_GET_WEBHOOK;
2585            let router = {
2586                let runnable_component =
2587                    RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2588                        .unwrap();
2589                let instance = WebhookEndpointCompiled::new(
2590                    WebhookEndpointConfig {
2591                        component_id: ComponentId::new(
2592                            ComponentType::WebhookEndpoint,
2593                            StrVariant::empty(),
2594                            concepts::component_id::ComponentDigest(
2595                                utils::sha256sum::calculate_sha256_file(wasm_file)
2596                                    .await
2597                                    .unwrap()
2598                                    .0,
2599                            ),
2600                        )
2601                        .unwrap(),
2602                        forward_stdout: Some(StdOutputConfig::Stdout),
2603                        forward_stderr: Some(StdOutputConfig::Stdout),
2604                        env_vars: Arc::from([]),
2605                        fuel: None,
2606                        backtrace_persist: false,
2607                        subscription_interruption: None,
2608                        logs_store_min_level: None,
2609                        allowed_hosts: Arc::from([]), // NO allowed hosts - request should be denied
2610                        js_config: None,
2611                        config_section_hint: ConfigSectionHint::WebhookEndpointWasm,
2612                    },
2613                    runnable_component,
2614                )
2615                .unwrap()
2616                .link(&engine, fn_registry.as_ref())
2617                .unwrap();
2618                let mut router = MethodAwareRouter::default();
2619                router.add(Some(Method::GET), "/http-get/:PORT", instance);
2620                router
2621            };
2622
2623            let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2624            let server_addr = tcp_listener.local_addr().unwrap();
2625            info!("Listening on port {}", server_addr.port());
2626            let (_server_termination_sender, server_termination_watcher) = watch::channel(());
2627            let (_wh_server_state_sender, wh_server_state_watcher) =
2628                watch::channel(Arc::new(WebhookServerState {
2629                    deployment_id: DEPLOYMENT_ID_DUMMY,
2630                    router: Arc::new(router),
2631                    fn_registry,
2632                }));
2633            let mut set = tokio::task::JoinSet::new();
2634            set.spawn(webhook_trigger::server(
2635                "test".to_string(),
2636                tcp_listener,
2637                engine,
2638                wh_server_state_watcher,
2639                db_forwarder_sender,
2640                db_pool.clone(),
2641                sim_clock.clone_box(),
2642                Arc::new(TokioSleep),
2643                None,
2644                server_termination_watcher,
2645            ));
2646
2647            // Send request to webhook
2648            let mock_port = mock_address.port();
2649            let resp = reqwest::get(format!("http://{server_addr}/http-get/{mock_port}"))
2650                .await
2651                .unwrap();
2652            // The webhook should return 500 because the HTTP request was denied
2653            assert_eq!(resp.status().as_u16(), 500);
2654            let body = resp.text().await.unwrap();
2655            assert_eq!("Component Error", body);
2656
2657            // Verify mock was not called
2658            mock_server.verify().await;
2659
2660            drop(activity_exec);
2661            drop(set);
2662            drop(db_guard);
2663            db_close.close().await;
2664        }
2665    }
2666
2667    pub(crate) mod js_runtime {
2668        use crate::RunnableComponent;
2669        use crate::engines::{EngineConfig, Engines};
2670        use crate::std_output_stream::StdOutputConfig;
2671        use crate::testing_fn_registry::TestingFnRegistry;
2672        use crate::webhook::webhook_trigger::{
2673            self, MethodAwareRouter, WebhookEndpointCompiled, WebhookEndpointConfig,
2674            WebhookEndpointJsConfig, WebhookServerError, WebhookServerState,
2675        };
2676        use concepts::component_id::ComponentDigest;
2677        use concepts::prefixed_ulid::DEPLOYMENT_ID_DUMMY;
2678        use concepts::time::{ClockFn, TokioSleep};
2679        use concepts::{ComponentId, ComponentType, StrVariant};
2680        use std::collections::HashMap;
2681        use std::net::SocketAddr;
2682        use std::sync::Arc;
2683        use test_utils::sim_clock::SimClock;
2684        use tokio::net::TcpListener;
2685        use tokio::sync::{mpsc, watch};
2686        use tracing::info;
2687        use utils::sha256sum::calculate_sha256_file;
2688
2689        struct WatchGuard {
2690            #[expect(dead_code)]
2691            server_termination_sender: watch::Sender<()>,
2692            #[expect(dead_code)]
2693            wh_server_state_sender: watch::Sender<Arc<WebhookServerState>>,
2694        }
2695
2696        async fn start_js_webhook_server(
2697            source: &str,
2698        ) -> (
2699            tokio::task::JoinSet<Result<(), WebhookServerError>>,
2700            SocketAddr,
2701            WatchGuard,
2702        ) {
2703            let sim_clock = SimClock::default();
2704            let (_guard, db_pool, _db_close) = db_tests::Database::Sqlite.set_up().await;
2705            let fn_registry = TestingFnRegistry::new_from_components(vec![]);
2706            let engine = Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2707            let (db_forwarder_sender, _) = mpsc::channel(1);
2708            let wasm_file = webhook_js_runtime_builder::WEBHOOK_JS_RUNTIME;
2709            let router = {
2710                let runnable_component =
2711                    RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2712                        .unwrap();
2713                let instance = WebhookEndpointCompiled::new(
2714                    WebhookEndpointConfig {
2715                        component_id: ComponentId::new(
2716                            ComponentType::WebhookEndpoint,
2717                            StrVariant::empty(),
2718                            ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
2719                        )
2720                        .unwrap(),
2721                        forward_stdout: Some(StdOutputConfig::Stdout),
2722                        forward_stderr: Some(StdOutputConfig::Stdout),
2723                        env_vars: Arc::from([]),
2724                        fuel: None,
2725                        backtrace_persist: false,
2726                        subscription_interruption: None,
2727                        logs_store_min_level: None,
2728                        allowed_hosts: Arc::from([]),
2729                        js_config: Some(WebhookEndpointJsConfig {
2730                            source: source.to_string(),
2731                            file_name: String::new(),
2732                            resolved_imports: HashMap::new(),
2733                        }),
2734                        config_section_hint:
2735                            crate::http_hooks::ConfigSectionHint::WebhookEndpointJs,
2736                    },
2737                    runnable_component,
2738                )
2739                .unwrap()
2740                .link(&engine, fn_registry.as_ref())
2741                .unwrap();
2742                let mut router = MethodAwareRouter::default();
2743                router.add(None, "", instance);
2744                router
2745            };
2746            let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
2747                .await
2748                .unwrap();
2749            let server_addr = tcp_listener.local_addr().unwrap();
2750            info!("JS webhook listening on port {}", server_addr.port());
2751            let (server_termination_sender, server_termination_watcher) = watch::channel(());
2752            let (wh_server_state_sender, wh_server_state_watcher) =
2753                watch::channel(Arc::new(WebhookServerState {
2754                    deployment_id: DEPLOYMENT_ID_DUMMY,
2755                    router: Arc::new(router),
2756                    fn_registry,
2757                }));
2758            let mut set = tokio::task::JoinSet::new();
2759            set.spawn(webhook_trigger::server(
2760                "test-js".to_string(),
2761                tcp_listener,
2762                engine,
2763                wh_server_state_watcher,
2764                db_forwarder_sender,
2765                db_pool,
2766                sim_clock.clone_box(),
2767                Arc::new(TokioSleep),
2768                None,
2769                server_termination_watcher,
2770            ));
2771            (
2772                set,
2773                server_addr,
2774                WatchGuard {
2775                    server_termination_sender,
2776                    wh_server_state_sender,
2777                },
2778            )
2779        }
2780
2781        #[tokio::test]
2782        async fn webhook_js_hello_world() {
2783            test_utils::set_up();
2784            let js_source = r#"
2785                export default function handle(request) {
2786                    return new Response("Hello from JS!", {
2787                        status: 200,
2788                        headers: { "content-type": "text/plain" },
2789                    });
2790                }
2791            "#;
2792            let (_server, server_addr, _termination_sender) =
2793                start_js_webhook_server(js_source).await;
2794            let resp = reqwest::get(format!("http://{server_addr}/"))
2795                .await
2796                .unwrap();
2797            assert_eq!(resp.status().as_u16(), 200);
2798            assert_eq!("Hello from JS!", resp.text().await.unwrap());
2799        }
2800
2801        #[tokio::test]
2802        async fn webhook_js_reads_request_method_and_url() {
2803            test_utils::set_up();
2804            let js_source = r#"
2805                export default function handle(request) {
2806                    return new Response(request.method + " " + request.url);
2807                }
2808            "#;
2809            let (_server, server_addr, _termination_sender) =
2810                start_js_webhook_server(js_source).await;
2811            let resp = reqwest::get(format!("http://{server_addr}/some/path"))
2812                .await
2813                .unwrap();
2814            assert_eq!(resp.status().as_u16(), 200);
2815            let body = resp.text().await.unwrap();
2816            assert!(
2817                body.contains("GET") && body.contains("/some/path"),
2818                "Expected method and path in response, got: {body}"
2819            );
2820        }
2821
2822        #[tokio::test]
2823        async fn webhook_js_custom_status_code() {
2824            test_utils::set_up();
2825            let js_source = r#"
2826                export default function handle(request) {
2827                    return new Response("created", { status: 201 });
2828                }
2829            "#;
2830            let (_server, server_addr, _termination_sender) =
2831                start_js_webhook_server(js_source).await;
2832            let resp = reqwest::get(format!("http://{server_addr}/"))
2833                .await
2834                .unwrap();
2835            assert_eq!(resp.status().as_u16(), 201);
2836            assert_eq!("created", resp.text().await.unwrap());
2837        }
2838
2839        async fn start_js_webhook_server_with_http(
2840            source: &str,
2841            allowed_host: &str,
2842        ) -> (
2843            tokio::task::JoinSet<Result<(), WebhookServerError>>,
2844            SocketAddr,
2845            WatchGuard,
2846        ) {
2847            use crate::http_request_policy::{AllowedHostConfig, HostPattern, MethodsPattern};
2848            let host_pattern =
2849                HostPattern::parse_with_methods(allowed_host, MethodsPattern::AllMethods).unwrap();
2850            let sim_clock = SimClock::default();
2851            let (_guard, db_pool, _db_close) = db_tests::Database::Sqlite.set_up().await;
2852            let fn_registry = TestingFnRegistry::new_from_components(vec![]);
2853            let engine = Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2854            let (db_forwarder_sender, _) = mpsc::channel(1);
2855            let wasm_file = webhook_js_runtime_builder::WEBHOOK_JS_RUNTIME;
2856            let router = {
2857                let runnable_component =
2858                    RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2859                        .unwrap();
2860                let instance = WebhookEndpointCompiled::new(
2861                    WebhookEndpointConfig {
2862                        component_id: ComponentId::new(
2863                            ComponentType::WebhookEndpoint,
2864                            StrVariant::empty(),
2865                            ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
2866                        )
2867                        .unwrap(),
2868                        forward_stdout: Some(StdOutputConfig::Stdout),
2869                        forward_stderr: Some(StdOutputConfig::Stdout),
2870                        env_vars: Arc::from([]),
2871                        fuel: None,
2872                        backtrace_persist: false,
2873                        subscription_interruption: None,
2874                        logs_store_min_level: None,
2875                        allowed_hosts: Arc::from(vec![AllowedHostConfig {
2876                            pattern: host_pattern,
2877                            secret_env_mappings: Vec::new(),
2878                            replace_in: hashbrown::HashSet::new(),
2879                        }]),
2880                        js_config: Some(WebhookEndpointJsConfig {
2881                            source: source.to_string(),
2882                            file_name: String::new(),
2883                            resolved_imports: HashMap::new(),
2884                        }),
2885                        config_section_hint:
2886                            crate::http_hooks::ConfigSectionHint::WebhookEndpointJs,
2887                    },
2888                    runnable_component,
2889                )
2890                .unwrap()
2891                .link(&engine, fn_registry.as_ref())
2892                .unwrap();
2893                let mut router = MethodAwareRouter::default();
2894                router.add(None, "", instance);
2895                router
2896            };
2897            let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
2898                .await
2899                .unwrap();
2900            let server_addr = tcp_listener.local_addr().unwrap();
2901            info!(
2902                "JS webhook with HTTP listening on port {}",
2903                server_addr.port()
2904            );
2905            let (server_termination_sender, server_termination_watcher) = watch::channel(());
2906            let (wh_server_state_sender, wh_server_state_watcher) =
2907                watch::channel(Arc::new(WebhookServerState {
2908                    deployment_id: DEPLOYMENT_ID_DUMMY,
2909                    router: Arc::new(router),
2910                    fn_registry,
2911                }));
2912            let mut set = tokio::task::JoinSet::new();
2913            set.spawn(webhook_trigger::server(
2914                "test-js-http".to_string(),
2915                tcp_listener,
2916                engine,
2917                wh_server_state_watcher,
2918                db_forwarder_sender,
2919                db_pool,
2920                sim_clock.clone_box(),
2921                Arc::new(TokioSleep),
2922                None,
2923                server_termination_watcher,
2924            ));
2925            (
2926                set,
2927                server_addr,
2928                WatchGuard {
2929                    server_termination_sender,
2930                    wh_server_state_sender,
2931                },
2932            )
2933        }
2934
2935        #[tokio::test]
2936        async fn webhook_js_fetch_get() {
2937            use wiremock::{
2938                Mock, MockServer, ResponseTemplate,
2939                matchers::{method, path},
2940            };
2941            test_utils::set_up();
2942            let mock_server = MockServer::start().await;
2943            Mock::given(method("GET"))
2944                .and(path("/hello"))
2945                .respond_with(ResponseTemplate::new(200).set_body_string("fetch works"))
2946                .expect(1)
2947                .mount(&mock_server)
2948                .await;
2949
2950            let url = mock_server.uri();
2951            let js_source = format!(
2952                r#"
2953                export default async function handle(request) {{
2954                    const resp = await fetch("{url}/hello");
2955                    const text = await resp.text();
2956                    return new Response(text);
2957                }}
2958                "#
2959            );
2960
2961            let allowed = format!("http://127.0.0.1:{}", mock_server.address().port());
2962            let (_server, server_addr, _termination_sender) =
2963                start_js_webhook_server_with_http(&js_source, &allowed).await;
2964            let resp = reqwest::get(format!("http://{server_addr}/"))
2965                .await
2966                .unwrap();
2967            let status = resp.status().as_u16();
2968            let body = resp.text().await.unwrap();
2969            assert_eq!((status, body.as_str()), (200, "fetch works"));
2970        }
2971
2972        #[tokio::test]
2973        async fn webhook_js_request_headers() {
2974            test_utils::set_up();
2975            // JS handler that returns the x-custom header values as a JSON array.
2976            // request.headers is a proper Headers object; multiple values for the
2977            // same header name are combined with a comma by Headers.get().
2978            let js_source = r#"
2979                export default function handle(request) {
2980                    const value = request.headers.get("x-custom");
2981                    console.log("header value:`" +value + "`");
2982                    return Response.json(value !== null ? value.split(", ") : []);
2983                }
2984                "#;
2985
2986            let (_server, server_addr, _termination_sender) =
2987                start_js_webhook_server(js_source).await;
2988
2989            // Send request with multiple values for the same header
2990            let client = reqwest::Client::new();
2991            let resp = client
2992                .get(format!("http://{server_addr}/"))
2993                .header("x-custom", "value1")
2994                .header("x-custom", "value2")
2995                .send()
2996                .await
2997                .unwrap();
2998
2999            let status = resp.status().as_u16();
3000            let body = resp.text().await.unwrap();
3001            assert_eq!(status, 200);
3002            let headers: Vec<String> = serde_json::from_str(&body).unwrap();
3003            assert_eq!(headers, vec!["value1", "value2"]);
3004        }
3005
3006        #[tokio::test]
3007        async fn webhook_js_fetch_proxy_headers() {
3008            use wiremock::{
3009                Mock, MockServer, ResponseTemplate,
3010                matchers::{header, method},
3011            };
3012            test_utils::set_up();
3013            let mock_server = MockServer::start().await;
3014            Mock::given(method("GET"))
3015                .and(header("x-forwarded", "proxy-value"))
3016                .respond_with(ResponseTemplate::new(200).set_body_string("proxied"))
3017                .expect(1)
3018                .mount(&mock_server)
3019                .await;
3020
3021            let url = mock_server.uri();
3022            let js_source = format!(
3023                r#"
3024                export default async function handle(request) {{
3025                    console.log("Got headers", [...request.headers]);
3026                    return fetch("{url}/", {{ headers: request.headers }});
3027                }}
3028                "#
3029            );
3030
3031            let allowed = format!("http://127.0.0.1:{}", mock_server.address().port());
3032            let (_server, server_addr, _termination_sender) =
3033                start_js_webhook_server_with_http(&js_source, &allowed).await;
3034
3035            let client = reqwest::Client::new();
3036            let resp = client
3037                .get(format!("http://{server_addr}/"))
3038                .header("x-forwarded", "proxy-value")
3039                .send()
3040                .await
3041                .unwrap();
3042
3043            let status = resp.status().as_u16();
3044            let body = resp.text().await.unwrap();
3045            assert_eq!((status, body.as_str()), (200, "proxied"));
3046        }
3047
3048        #[tokio::test]
3049        async fn webhook_js_env_var() {
3050            test_utils::set_up();
3051            let js_source = r#"
3052                export default function handle(request) {
3053                    const jsSource = process.env["__OBELISK_JS_SOURCE__"];
3054                    const missing = process.env["MISSING_VAR"];
3055                    return Response.json({
3056                        hasJsSource: jsSource !== undefined,
3057                        missingIsUndefined: missing === undefined,
3058                    });
3059                }
3060            "#;
3061            let (_server, server_addr, _termination_sender) =
3062                start_js_webhook_server(js_source).await;
3063            let resp = reqwest::get(format!("http://{server_addr}/"))
3064                .await
3065                .unwrap();
3066            assert_eq!(resp.status().as_u16(), 200);
3067            let body: serde_json::Value = resp.json().await.unwrap();
3068            assert_eq!(body["hasJsSource"], serde_json::json!(true));
3069            assert_eq!(body["missingIsUndefined"], serde_json::json!(true));
3070        }
3071
3072        #[tokio::test]
3073        async fn webhook_js_request_body_text() {
3074            test_utils::set_up();
3075            let js_source = r#"
3076                export default async function handle(request) {
3077                    const text = await request.text();
3078                    return new Response(text, { headers: { "content-type": "text/plain" } });
3079                }
3080            "#;
3081            let (_server, server_addr, _termination_sender) =
3082                start_js_webhook_server(js_source).await;
3083            let client = reqwest::Client::new();
3084            let resp = client
3085                .post(format!("http://{server_addr}/"))
3086                .body("hello from body")
3087                .send()
3088                .await
3089                .unwrap();
3090            assert_eq!(resp.status().as_u16(), 200);
3091            assert_eq!(resp.text().await.unwrap(), "hello from body");
3092        }
3093
3094        #[tokio::test]
3095        async fn webhook_js_request_body_json() {
3096            test_utils::set_up();
3097            let js_source = r"
3098                export default async function handle(request) {
3099                    const data = await request.json();
3100                    return Response.json({ received: data });
3101                }
3102            ";
3103            let (_server, server_addr, _termination_sender) =
3104                start_js_webhook_server(js_source).await;
3105            let client = reqwest::Client::new();
3106            let resp = client
3107                .post(format!("http://{server_addr}/"))
3108                .header("content-type", "application/json")
3109                .body(r#"{"name":"world","value":42}"#)
3110                .send()
3111                .await
3112                .unwrap();
3113            assert_eq!(resp.status().as_u16(), 200);
3114            let body: serde_json::Value = resp.json().await.unwrap();
3115            assert_eq!(body["received"]["name"], "world");
3116            assert_eq!(body["received"]["value"], 42);
3117        }
3118
3119        #[tokio::test]
3120        async fn webhook_js_request_body_form_data() {
3121            test_utils::set_up();
3122            let js_source = r"
3123                export default async function handle(request) {
3124                    const form = await request.formData();
3125                    return Response.json(form);
3126                }
3127            ";
3128            let (_server, server_addr, _termination_sender) =
3129                start_js_webhook_server(js_source).await;
3130            let client = reqwest::Client::new();
3131            let resp = client
3132                .post(format!("http://{server_addr}/"))
3133                .header("content-type", "application/x-www-form-urlencoded")
3134                .body("name=Alice&city=Wonderland")
3135                .send()
3136                .await
3137                .unwrap();
3138            assert_eq!(resp.status().as_u16(), 200);
3139            let body: serde_json::Value = resp.json().await.unwrap();
3140            assert_eq!(body["name"], "Alice");
3141            assert_eq!(body["city"], "Wonderland");
3142        }
3143
3144        #[tokio::test]
3145        async fn webhook_js_request_body_empty() {
3146            test_utils::set_up();
3147            // GET request with no body — text() should return an empty string.
3148            let js_source = r"
3149                export default async function handle(request) {
3150                    const text = await request.text();
3151                    return new Response(JSON.stringify({ len: text.length }));
3152                }
3153            ";
3154            let (_server, server_addr, _termination_sender) =
3155                start_js_webhook_server(js_source).await;
3156            let resp = reqwest::get(format!("http://{server_addr}/"))
3157                .await
3158                .unwrap();
3159            assert_eq!(resp.status().as_u16(), 200);
3160            let body: serde_json::Value = resp.json().await.unwrap();
3161            assert_eq!(body["len"], 0);
3162        }
3163
3164        #[tokio::test]
3165        async fn webhook_js_generate_execution_id() {
3166            test_utils::set_up();
3167            let js_source = r#"
3168                export default function handle(request) {
3169                    const id1 = obelisk.executionIdGenerate();
3170                    const id2 = obelisk.executionIdGenerate();
3171                    return Response.json({
3172                        id1,
3173                        id2,
3174                        different: id1 !== id2,
3175                        hasPrefix: id1.startsWith("E_"),
3176                    });
3177                }
3178            "#;
3179            let (_server, server_addr, _termination_sender) =
3180                start_js_webhook_server(js_source).await;
3181            let resp = reqwest::get(format!("http://{server_addr}/"))
3182                .await
3183                .unwrap();
3184            assert_eq!(resp.status().as_u16(), 200);
3185            let body: serde_json::Value = resp.json().await.unwrap();
3186            assert_eq!(body["different"], serde_json::json!(true));
3187            assert_eq!(body["hasPrefix"], serde_json::json!(true));
3188        }
3189
3190        /// Test harness for JS webhook tests that need to call activities/workflows.
3191        struct JsWebhookWithActivitiesHarness {
3192            #[expect(dead_code)]
3193            server_set: tokio::task::JoinSet<Result<(), WebhookServerError>>,
3194            server_addr: SocketAddr,
3195            activity_exec: executor::executor::ExecTask,
3196            sim_clock: SimClock,
3197            db_pool: Arc<dyn concepts::storage::DbPool>,
3198            #[expect(dead_code)]
3199            db_close: db_tests::DbPoolCloseableWrapper,
3200            #[expect(dead_code)]
3201            guard: WatchGuard,
3202        }
3203
3204        impl JsWebhookWithActivitiesHarness {
3205            async fn new(js_source: &str) -> Self {
3206                use crate::activity::activity_worker::test::compile_activity;
3207                use crate::activity::activity_worker::tests::new_activity_fibo;
3208                use concepts::time::TokioSleep;
3209                use executor::executor::LockingStrategy;
3210
3211                let sim_clock = SimClock::default();
3212                let (_guard, db_pool, db_close) = db_tests::Database::Sqlite.set_up().await;
3213
3214                // Set up fibo activity worker
3215                let activity_exec = new_activity_fibo(
3216                    db_pool.clone(),
3217                    sim_clock.clone_box(),
3218                    TokioSleep,
3219                    LockingStrategy::ByComponentDigest,
3220                )
3221                .await;
3222
3223                // Create fn_registry with fibo activity
3224                let fn_registry = TestingFnRegistry::new_from_components(vec![
3225                    compile_activity(
3226                        test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
3227                    )
3228                    .await,
3229                ]);
3230
3231                let engine =
3232                    Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
3233                let (db_forwarder_sender, _) = mpsc::channel(1);
3234                let wasm_file = webhook_js_runtime_builder::WEBHOOK_JS_RUNTIME;
3235
3236                let router = {
3237                    let runnable_component =
3238                        RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
3239                            .unwrap();
3240                    let instance = WebhookEndpointCompiled::new(
3241                        WebhookEndpointConfig {
3242                            component_id: ComponentId::new(
3243                                ComponentType::WebhookEndpoint,
3244                                StrVariant::empty(),
3245                                ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
3246                            )
3247                            .unwrap(),
3248                            forward_stdout: Some(StdOutputConfig::Stdout),
3249                            forward_stderr: Some(StdOutputConfig::Stdout),
3250                            env_vars: Arc::from([]),
3251                            fuel: None,
3252                            backtrace_persist: false,
3253                            subscription_interruption: None,
3254                            logs_store_min_level: None,
3255                            allowed_hosts: Arc::from([]),
3256                            js_config: Some(WebhookEndpointJsConfig {
3257                                source: js_source.to_string(),
3258                                file_name: String::new(),
3259                                resolved_imports: HashMap::new(),
3260                            }),
3261                            config_section_hint:
3262                                crate::http_hooks::ConfigSectionHint::WebhookEndpointJs,
3263                        },
3264                        runnable_component,
3265                    )
3266                    .unwrap()
3267                    .link(&engine, fn_registry.as_ref())
3268                    .unwrap();
3269                    let mut router = MethodAwareRouter::default();
3270                    router.add(None, "", instance);
3271                    router
3272                };
3273
3274                let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
3275                    .await
3276                    .unwrap();
3277                let server_addr = tcp_listener.local_addr().unwrap();
3278                info!(
3279                    "JS webhook with activities listening on port {}",
3280                    server_addr.port()
3281                );
3282                let (server_termination_sender, server_termination_watcher) = watch::channel(());
3283                let (wh_server_state_sender, wh_server_state_watcher) =
3284                    watch::channel(Arc::new(WebhookServerState {
3285                        deployment_id: DEPLOYMENT_ID_DUMMY,
3286                        router: Arc::new(router),
3287                        fn_registry,
3288                    }));
3289                let mut server_set = tokio::task::JoinSet::new();
3290                server_set.spawn(webhook_trigger::server(
3291                    "test-js-activities".to_string(),
3292                    tcp_listener,
3293                    engine,
3294                    wh_server_state_watcher,
3295                    db_forwarder_sender,
3296                    db_pool.clone(),
3297                    sim_clock.clone_box(),
3298                    Arc::new(TokioSleep),
3299                    None,
3300                    server_termination_watcher,
3301                ));
3302
3303                Self {
3304                    server_set,
3305                    server_addr,
3306                    activity_exec,
3307                    sim_clock,
3308                    db_pool,
3309                    db_close,
3310                    guard: WatchGuard {
3311                        server_termination_sender,
3312                        wh_server_state_sender,
3313                    },
3314                }
3315            }
3316
3317            async fn tick_activity(&self) -> usize {
3318                use concepts::prefixed_ulid::RunId;
3319                self.activity_exec
3320                    .tick_test_await(self.sim_clock.now(), RunId::generate())
3321                    .await
3322                    .len()
3323            }
3324        }
3325
3326        #[tokio::test]
3327        async fn webhook_js_call_activity() {
3328            test_utils::set_up();
3329            let js_source = r#"
3330                export default function handle(request) {
3331                    // Call fibo(10) directly
3332                    const result = obelisk.call("testing:fibo/fibo.fibo", [10]);
3333                    return Response.json({ result });
3334                }
3335            "#;
3336
3337            let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3338
3339            // Start the webhook request in background
3340            let server_addr = harness.server_addr;
3341            let fetch_task = tokio::spawn(async move {
3342                reqwest::get(format!("http://{server_addr}/"))
3343                    .await
3344                    .unwrap()
3345            });
3346
3347            // Poll until the activity is pending and execute it
3348            while harness.tick_activity().await == 0 {
3349                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3350            }
3351
3352            // Get the response
3353            let resp = fetch_task.await.unwrap();
3354            assert_eq!(resp.status().as_u16(), 200);
3355            let body: serde_json::Value = resp.json().await.unwrap();
3356            assert_eq!(body["result"], serde_json::json!(55)); // fibo(10) = 55
3357        }
3358
3359        #[tokio::test]
3360        async fn webhook_js_schedule_activity() {
3361            use std::str::FromStr as _;
3362
3363            test_utils::set_up();
3364            let js_source = r#"
3365                export default function handle(request) {
3366                    // Schedule fibo(10) for later execution
3367                    const execId = obelisk.executionIdGenerate();
3368                    obelisk.schedule(execId, "testing:fibo/fibo.fibo", [10], { seconds: 60 });
3369                    return Response.json({ execId });
3370                }
3371            "#;
3372
3373            let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3374            let resp = reqwest::get(format!("http://{}/", harness.server_addr))
3375                .await
3376                .unwrap();
3377            assert_eq!(resp.status().as_u16(), 200);
3378            let body: serde_json::Value = resp.json().await.unwrap();
3379
3380            // Verify an execution ID was returned
3381            let exec_id_str = body["execId"].as_str().unwrap();
3382            assert!(
3383                exec_id_str.starts_with("E_"),
3384                "Expected execution ID prefix"
3385            );
3386
3387            // Verify the execution was created in the database
3388            let exec_id = concepts::ExecutionId::from_str(exec_id_str).unwrap();
3389            let conn = harness.db_pool.connection().await.unwrap();
3390            let create_req = conn.get_create_request(&exec_id).await.unwrap();
3391            assert_eq!(
3392                "testing:fibo/fibo.fibo",
3393                create_req.ffqn.to_string().as_str()
3394            );
3395        }
3396
3397        #[tokio::test]
3398        async fn webhook_js_get_status() {
3399            test_utils::set_up();
3400            let js_source = r#"
3401                export default function handle(request) {
3402                    // Schedule for later, then check status
3403                    const execId = obelisk.executionIdGenerate();
3404                    obelisk.schedule(execId, "testing:fibo/fibo.fibo", [10], { seconds: 60 });
3405                    const status = obelisk.getStatus(execId);
3406                    return Response.json({ execId, executionStatus: status });
3407                }
3408            "#;
3409
3410            let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3411            let resp = reqwest::get(format!("http://{}/", harness.server_addr))
3412                .await
3413                .unwrap();
3414            assert_eq!(resp.status().as_u16(), 200);
3415            let body: serde_json::Value = resp.json().await.unwrap();
3416
3417            // Verify status is "pendingAt" (scheduled for later)
3418            assert_eq!(
3419                body["executionStatus"]["status"],
3420                serde_json::json!("pendingAt")
3421            );
3422        }
3423
3424        #[tokio::test]
3425        async fn webhook_js_try_get_pending() {
3426            test_utils::set_up();
3427            let js_source = r#"
3428                export default function handle(request) {
3429                    // Schedule now but don't wait for completion
3430                    const execId = obelisk.executionIdGenerate();
3431                    obelisk.schedule(execId, "testing:fibo/fibo.fibo", [10]);
3432                    const result = obelisk.tryGet(execId);
3433                    return Response.json({ pending: result === undefined });
3434                }
3435            "#;
3436
3437            let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3438            let resp = reqwest::get(format!("http://{}/", harness.server_addr))
3439                .await
3440                .unwrap();
3441            assert_eq!(resp.status().as_u16(), 200);
3442            let body: serde_json::Value = resp.json().await.unwrap();
3443
3444            // Should return undefined since activity hasn't run yet
3445            assert_eq!(body["pending"], serde_json::json!(true));
3446        }
3447
3448        #[tokio::test]
3449        async fn webhook_js_call_with_error() {
3450            test_utils::set_up();
3451            // fibo(50) returns Err(()) in the test activity (n > 40 returns error)
3452            let js_source = r#"
3453                export default function handle(request) {
3454                    try {
3455                        obelisk.call("testing:fibo/fibo.fibo", [50]);
3456                        return Response.json({ result: "unexpected success" });
3457                    } catch (e) {
3458                        return Response.json({ error: e.message });
3459                    }
3460                }
3461            "#;
3462
3463            let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3464
3465            let server_addr = harness.server_addr;
3466            let fetch_task = tokio::spawn(async move {
3467                reqwest::get(format!("http://{server_addr}/"))
3468                    .await
3469                    .unwrap()
3470            });
3471
3472            // Poll until the activity is pending and execute it
3473            while harness.tick_activity().await == 0 {
3474                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3475            }
3476
3477            let resp = fetch_task.await.unwrap();
3478            assert_eq!(resp.status().as_u16(), 200);
3479            let body: serde_json::Value = resp.json().await.unwrap();
3480            // Should have caught the error
3481            assert!(body["error"].is_string());
3482        }
3483    }
3484
3485    #[test]
3486    fn routes() {
3487        let mut router = MethodAwareRouter::default();
3488        router.add(Some(Method::GET), "/foo", 1);
3489        router.add(Some(Method::GET), "/foo/*", 2);
3490        router.add(None, "/foo", 3);
3491        router.add(None, "/*", 4);
3492        router.add(None, "/", 5);
3493        router.add(Some(Method::GET), "/path/:param1/:param2", 6);
3494
3495        assert_eq!(
3496            1,
3497            **router
3498                .find(&Method::GET, &Uri::from_static("/foo"))
3499                .unwrap()
3500                .handler()
3501        );
3502        assert_eq!(
3503            2,
3504            **router
3505                .find(&Method::GET, &Uri::from_static("/foo/"))
3506                .unwrap()
3507                .handler()
3508        );
3509        assert_eq!(
3510            2,
3511            **router
3512                .find(&Method::GET, &Uri::from_static("/foo/foo/"))
3513                .unwrap()
3514                .handler()
3515        );
3516        assert_eq!(
3517            2,
3518            **router
3519                .find(&Method::GET, &Uri::from_static("/foo/foo/bar"))
3520                .unwrap()
3521                .handler()
3522        );
3523        assert_eq!(
3524            3,
3525            **router
3526                .find(&Method::POST, &Uri::from_static("/foo"))
3527                .unwrap()
3528                .handler()
3529        );
3530        assert_eq!(
3531            5,
3532            **router
3533                .find(&Method::GET, &Uri::from_static("/"))
3534                .unwrap()
3535                .handler()
3536        );
3537
3538        let found = router
3539            .find(&Method::GET, &Uri::from_static("/path/p1/p2"))
3540            .unwrap();
3541        assert_eq!(6, **found.handler());
3542        assert_eq!(
3543            hashbrown::HashMap::from([("param1", "p1"), ("param2", "p2")]),
3544            found
3545                .params()
3546                .into_iter()
3547                .collect::<hashbrown::HashMap<_, _>>()
3548        );
3549        let found = router
3550            .find(&Method::GET, &Uri::from_static("/path/p1/p2/p3"))
3551            .unwrap();
3552        assert_eq!(4, **found.handler());
3553    }
3554
3555    #[test]
3556    fn routes_empty_fallback() {
3557        let mut router = MethodAwareRouter::default();
3558        router.add(Some(Method::GET), "/foo", 1);
3559        router.add(None, "", 9);
3560
3561        assert_eq!(
3562            1,
3563            **router
3564                .find(&Method::GET, &Uri::from_static("/foo"))
3565                .unwrap()
3566                .handler()
3567        );
3568        assert_eq!(
3569            9,
3570            **router
3571                .find(&Method::GET, &Uri::from_static("/"))
3572                .unwrap()
3573                .handler()
3574        );
3575        assert_eq!(
3576            9,
3577            **router
3578                .find(&Method::GET, &Uri::from_static("/x"))
3579                .unwrap()
3580                .handler()
3581        );
3582        assert_eq!(
3583            9,
3584            **router
3585                .find(&Method::GET, &Uri::from_static("/x/"))
3586                .unwrap()
3587                .handler()
3588        );
3589    }
3590}