1use crate::component_logger::{ComponentLogger, LogStrageConfig, log_activities};
2use crate::envvar::EnvVar;
3use crate::http_hooks::{HttpClientTracesContainer, HttpHooks};
4use crate::std_output_stream::{LogStream, StdOutput, StdOutputConfig, StdOutputConfigWithSender};
5use crate::webhook::webhook_registry::WebhookStateWatcher;
6use crate::webhook::webhook_trigger::types::obelisk::types::join_set::JoinNextError;
7use crate::webhook::webhook_trigger::types::{
8 GetErrorTrappable, GetStatusErrorTrappable, ScheduleJsonErrorTrappable, TryGetErrorTrappable,
9};
10use crate::workflow::host_exports::{SUFFIX_FN_SCHEDULE, history_event_schedule_at_from_wast_val};
11use crate::{RunnableComponent, WasmFileError};
12use assert_matches::assert_matches;
13use concepts::prefixed_ulid::{
14 DeploymentId, ExecutionIdDerived, ExecutionIdTopLevel, JOIN_SET_START_IDX, RunId,
15};
16use concepts::storage::{
17 AppendRequest, BacktraceInfo, CreateRequest, DbConnection, DbErrorGeneric, DbErrorRead,
18 DbErrorReadWithTimeout, DbErrorWrite, DbPool, ExecutionRequest, HistoryEvent,
19 HistoryEventScheduleAt, JoinSetRequest, LogInfoAppendRow, LogLevel, LogStreamType,
20 PendingState, PendingStateFinishedError, PendingStateFinishedResultKind, TimeoutOutcome,
21 Version, http_client_trace::HttpClientTrace,
22};
23use concepts::time::{ClockFn, Sleep};
24use concepts::{
25 ComponentId, ExecutionFailureKind, ExecutionId, ExecutionMetadata, FinishedExecutionFailure,
26 FunctionFqn, FunctionMetadata, FunctionRegistry, IfcFqnName, JoinSetKind, Params, ReturnType,
27 SUFFIX_PKG_SCHEDULE, SUPPORTED_RETURN_VALUE_OK_EMPTY, StrVariant, TrapKind,
28};
29use concepts::{JoinSetId, SupportedFunctionReturnValue};
30use http_body_util::combinators::UnsyncBoxBody;
31use hyper::body::Bytes;
32use hyper::server::conn::http1;
33use hyper::{Method, StatusCode, Uri};
34use hyper_util::rt::TokioIo;
35use log_activities::obelisk::log::log::Host;
36use route_recognizer::{Match, Router};
37use std::collections::HashMap;
38use std::ops::Deref;
39use std::pin::Pin;
40use std::str::FromStr;
41use std::time::Duration;
42use std::{fmt::Debug, sync::Arc};
43use tokio::net::TcpListener;
44use tokio::select;
45use tokio::sync::{OwnedSemaphorePermit, mpsc, watch};
46use tracing::{
47 Instrument, Span, debug, debug_span, error, info, info_span, instrument, trace, warn,
48};
49use types::obelisk::types::execution::Host as ExecutionHost;
50use types::obelisk::types::join_set::HostJoinSet;
51use types::obelisk::webhook::webhook_support::Host as WebhookSupportHost;
52use val_json::wast_val::WastVal;
53use wasmtime::component::ResourceTable;
54use wasmtime::component::types::ComponentFunc;
55use wasmtime::component::{Linker, Val};
56use wasmtime::{Engine, Store, UpdateDeadline};
57use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
58use wasmtime_wasi_http::WasiHttpCtx;
59use wasmtime_wasi_http::p2::bindings::ProxyPre;
60use wasmtime_wasi_http::p2::bindings::http::types::Scheme;
61use wasmtime_wasi_http::p2::body::HyperOutgoingBody;
62use wasmtime_wasi_http::p2::{WasiHttpCtxView, WasiHttpView};
63use wasmtime_wasi_io::IoView;
64
65const HTTP_HANDLER_FFQN: FunctionFqn =
66 FunctionFqn::new_static("wasi:http/incoming-handler", "handle");
67
68pub(crate) mod types {
69 wasmtime::component::bindgen!({
70 path: "host-wit-webhook/",
71 inline: "package any:any;
72 world bindings {
73 import obelisk:types/time@4.2.0;
74 import obelisk:types/execution@4.2.0;
75 import obelisk:types/backtrace@4.2.0;
76 import obelisk:types/join-set@4.2.0;
77 import obelisk:webhook/webhook-support@5.2.0;
78 }",
79 world: "any:any/bindings",
80 imports: {
81 "obelisk:webhook/webhook-support": async | trappable,
83 },
84 with: {
85 "obelisk:types/join-set.join-set": concepts::JoinSetId,
86 },
87 trappable_error_type: {
88 "obelisk:types/execution.schedule-json-error" => crate::webhook::webhook_trigger::types::ScheduleJsonErrorTrappable,
89 "obelisk:webhook/webhook-support.get-error" => crate::webhook::webhook_trigger::types::GetErrorTrappable,
90 "obelisk:webhook/webhook-support.get-status-error" => crate::webhook::webhook_trigger::types::GetStatusErrorTrappable,
91 "obelisk:webhook/webhook-support.try-get-error" => crate::webhook::webhook_trigger::types::TryGetErrorTrappable,
92 },
93 });
94
95 #[derive(Debug, thiserror::Error)]
97 pub enum ScheduleJsonErrorTrappable {
98 #[error(transparent)]
99 Normal(#[from] obelisk::types::execution::ScheduleJsonError),
100 #[error(transparent)]
101 Trap(#[from] wasmtime::Error),
102 }
103
104 #[derive(Debug, thiserror::Error)]
106 pub enum GetErrorTrappable {
107 #[error(transparent)]
108 Normal(#[from] obelisk::webhook::webhook_support::GetError),
109 #[error(transparent)]
110 Trap(#[from] wasmtime::Error),
111 }
112
113 #[derive(Debug, thiserror::Error)]
115 pub enum GetStatusErrorTrappable {
116 #[error(transparent)]
117 Normal(#[from] obelisk::webhook::webhook_support::GetStatusError),
118 #[error(transparent)]
119 Trap(#[from] wasmtime::Error),
120 }
121
122 #[derive(Debug, thiserror::Error)]
124 pub enum TryGetErrorTrappable {
125 #[error(transparent)]
126 Normal(#[from] obelisk::webhook::webhook_support::TryGetError),
127 #[error(transparent)]
128 Trap(#[from] wasmtime::Error),
129 }
130}
131
132fn supported_return_value_to_json_result(
139 retval: &SupportedFunctionReturnValue,
140) -> Result<Option<String>, Option<String>> {
141 match retval {
142 SupportedFunctionReturnValue::Ok(Some(val_with_type)) => {
143 let json =
144 serde_json::to_string(&val_with_type.value).unwrap_or_else(|_| "null".to_string());
145 Ok(Some(json))
146 }
147 SupportedFunctionReturnValue::Ok(None) => Ok(None),
148 SupportedFunctionReturnValue::Err(Some(val_with_type)) => {
149 let json =
150 serde_json::to_string(&val_with_type.value).unwrap_or_else(|_| "null".to_string());
151 Err(Some(json))
152 }
153 SupportedFunctionReturnValue::Err(None) => Err(None),
154 SupportedFunctionReturnValue::ExecutionFailure(err) => {
155 Err(Some(format!("execution error: {err}")))
156 }
157 }
158}
159
160fn schedule_at_from_webhook(
161 schedule_at: types::obelisk::webhook::webhook_support::ScheduleAt,
162) -> HistoryEventScheduleAt {
163 use chrono::{DateTime, Utc};
164 use std::time::UNIX_EPOCH;
165 use types::obelisk::webhook::webhook_support::ScheduleAt;
166
167 match schedule_at {
168 ScheduleAt::Now => HistoryEventScheduleAt::Now,
169 ScheduleAt::At(datetime) => {
170 let duration = Duration::new(datetime.seconds, datetime.nanoseconds);
171 let systemtime = UNIX_EPOCH + duration;
172 HistoryEventScheduleAt::At(DateTime::<Utc>::from(systemtime))
173 }
174 ScheduleAt::In(duration) => {
175 use types::obelisk::types::time::Duration as WitDuration;
176 let std_duration = match duration {
177 WitDuration::Milliseconds(millis) => Duration::from_millis(millis),
178 WitDuration::Seconds(secs) => Duration::from_secs(secs),
179 WitDuration::Minutes(mins) => Duration::from_secs(u64::from(mins * 60)),
180 WitDuration::Hours(hours) => Duration::from_secs(u64::from(hours * 60 * 60)),
181 WitDuration::Days(days) => Duration::from_secs(u64::from(days * 24 * 60 * 60)),
182 };
183 HistoryEventScheduleAt::In(std_duration)
184 }
185 }
186}
187
188#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
189pub struct HttpTriggerConfig {
190 pub component_id: ComponentId,
191}
192type StdError = Box<dyn std::error::Error + Send + Sync>;
193
194#[derive(Debug, thiserror::Error)]
195pub enum WebhookServerError {
196 #[error("socket error: {0}")]
197 SocketError(std::io::Error),
198}
199
200pub struct WebhookEndpointCompiled {
201 pub config: WebhookEndpointConfig,
202 pub runnable_component: RunnableComponent,
203}
204
205impl WebhookEndpointCompiled {
206 pub fn new(
207 config: WebhookEndpointConfig,
208 runnable_component: RunnableComponent,
209 ) -> Result<Self, WasmFileError> {
210 Ok(Self {
211 config,
212 runnable_component,
213 })
214 }
215
216 #[must_use]
217 pub fn imports(&self) -> &[FunctionMetadata] {
218 &self.runnable_component.wasm_component.exim.imports_flat
219 }
220
221 #[instrument(skip_all, fields(component_id = %self.config.component_id), err)]
222 pub fn link(
223 self,
224 engine: &Engine,
225 fn_registry: &dyn FunctionRegistry,
226 ) -> Result<WebhookEndpointInstanceLinked, WasmFileError> {
227 let mut linker = Linker::new(engine);
228 wasmtime_wasi::p2::add_to_linker_async(&mut linker)
230 .map_err(|err| WasmFileError::linking_error("cannot link `wasmtime_wasi`", err))?;
231 wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker)
233 .map_err(|err| WasmFileError::linking_error("cannot link `wasmtime_wasi_http`", err))?;
234 WebhookEndpointCtx::add_to_linker(&mut linker)?;
236
237 for import in fn_registry
239 .all_exports()
240 .iter()
241 .filter(|import| {
242 !import.ifc_fqn.is_namespace_obelisk() && !import.ifc_fqn.is_namespace_wasi()
244 })
245 .filter(|import| {
246 !import.ifc_fqn.is_extension()
248 || import
249 .ifc_fqn
250 .package_strip_obelisk_schedule_suffix()
251 .is_some()
252 })
253 {
254 trace!(
255 ifc_fqn = %import.ifc_fqn,
256 "Adding imported interface to the linker",
257 );
258 match linker.instance(import.ifc_fqn.deref()) {
259 Ok(mut linker_instance) => {
260 for function_name in import.fns.keys() {
261 let ffqn = FunctionFqn {
262 ifc_fqn: import.ifc_fqn.clone(),
263 function_name: function_name.clone(),
264 };
265 trace!("Adding mock for imported function {ffqn} to the linker");
266 let res = linker_instance.func_new_async(function_name.deref(), {
267 let ffqn = ffqn.clone();
268 move |mut store_ctx: wasmtime::StoreContextMut<
269 '_,
270 WebhookEndpointCtx,
271 >,
272 _component_func: ComponentFunc,
273 params: &[Val],
274 results: &mut [Val]| {
275 let ffqn = ffqn.clone();
276 let wasm_backtrace = if self.config.backtrace_persist {
277 let wasm_backtrace =
278 wasmtime::WasmBacktrace::capture(&store_ctx);
279 concepts::storage::WasmBacktrace::maybe_from(&wasm_backtrace)
280 } else {
281 None
282 };
283
284 Box::new(async move {
285 Ok(store_ctx
286 .data_mut()
287 .call_imported_fn(ffqn, params, results, wasm_backtrace)
288 .await?)
289 })
290 }
291 });
292 if let Err(err) = res {
293 return Err(WasmFileError::linking_error(
294 format!("cannot add mock for imported function {ffqn}"),
295 err,
296 ));
297 }
298 }
299 }
300 Err(err) => {
301 warn!(
302 "Skipping interface {ifc_fqn} - {err:?}",
303 ifc_fqn = import.ifc_fqn
304 );
305 }
306 }
307 }
308
309 let mut config = self.config;
311 if let Some(js_config) = &mut config.js_config {
312 js_config.resolved_imports =
313 crate::js_imports::resolve_js_imports(&js_config.source, fn_registry)
314 .map_err(|e| crate::WasmFileError::linking_error("JS import resolution", e))?;
315 }
316
317 let proxy_pre = linker
319 .instantiate_pre(&self.runnable_component.wasmtime_component)
320 .map_err(|err: wasmtime::Error| {
321 WasmFileError::linking_error("linking error while creating instantiate_pre", err)
322 })?;
323 let proxy_pre = Arc::new(ProxyPre::new(proxy_pre).map_err(|err: wasmtime::Error| {
324 WasmFileError::linking_error("linking error while creating ProxyPre instance", err)
325 })?);
326
327 Ok(WebhookEndpointInstanceLinked {
328 config: Arc::new(config),
329 proxy_pre,
330 })
331 }
332}
333
334#[derive(Clone, derive_more::Debug)]
335pub struct WebhookEndpointInstanceLinked {
336 #[debug(skip)]
337 proxy_pre: Arc<ProxyPre<WebhookEndpointCtx>>,
338 config: Arc<WebhookEndpointConfig>,
339}
340impl WebhookEndpointInstanceLinked {
341 #[must_use]
342 pub fn build(
343 &self,
344 log_forwarder_sender: &mpsc::Sender<LogInfoAppendRow>,
345 ) -> WebhookEndpointInstance {
346 let stdout = StdOutputConfigWithSender::new(
347 self.config.forward_stdout,
348 log_forwarder_sender,
349 LogStreamType::StdOut,
350 );
351 let stderr = StdOutputConfigWithSender::new(
352 self.config.forward_stderr,
353 log_forwarder_sender,
354 LogStreamType::StdErr,
355 );
356 WebhookEndpointInstance {
357 proxy_pre: self.proxy_pre.clone(),
358 stdout,
359 stderr,
360 logs_storage_config: self.config.logs_store_min_level.map(|min_level| {
361 LogStrageConfig {
362 min_level,
363 log_sender: log_forwarder_sender.clone(),
364 }
365 }),
366 config: self.config.clone(),
367 }
368 }
369}
370
371#[derive(Clone, derive_more::Debug)]
372pub struct WebhookEndpointInstance {
373 #[debug(skip)]
374 proxy_pre: Arc<ProxyPre<WebhookEndpointCtx>>,
375 config: Arc<WebhookEndpointConfig>,
376 #[debug(skip)]
377 stdout: Option<StdOutputConfigWithSender>,
378 #[debug(skip)]
379 stderr: Option<StdOutputConfigWithSender>,
380 logs_storage_config: Option<LogStrageConfig>,
381}
382
383pub struct MethodAwareRouter<T> {
384 method_map: hashbrown::HashMap<Method, Router<T>>,
385 fallback: Router<T>, }
387
388impl<T: Clone> MethodAwareRouter<T> {
390 pub fn add(&mut self, method: Option<Method>, route: &str, dest: T) {
391 let route = if route.is_empty() { "/*" } else { route };
392
393 let mut add = |method, route, dest| {
394 if let Some(method) = method {
395 self.method_map.entry(method).or_default().add(route, dest);
396 } else {
397 self.fallback.add(route, dest);
398 }
399 };
400
401 let prefix_with_slash;
402 if let Some(prefix) = route.strip_suffix("/*") {
403 prefix_with_slash = format!("{prefix}/");
405 add(method.clone(), &prefix_with_slash, dest.clone());
406 }
407 add(method, route, dest);
408 }
409}
410
411impl<T> MethodAwareRouter<T> {
412 fn find(&self, method: &Method, path: &Uri) -> Option<Match<&T>> {
413 let path = path.path();
414 self.method_map
415 .get(method)
416 .and_then(|router| router.recognize(path).ok())
417 .or_else(|| self.fallback.recognize(path).ok())
418 }
419}
420
421impl<T> Default for MethodAwareRouter<T> {
422 fn default() -> Self {
423 Self {
424 method_map: hashbrown::HashMap::default(),
425 fallback: Router::default(),
426 }
427 }
428}
429
430pub struct WebhookServerState {
432 pub deployment_id: DeploymentId,
433 pub router: Arc<MethodAwareRouter<WebhookEndpointInstanceLinked>>,
434 pub fn_registry: Arc<dyn FunctionRegistry>,
435}
436
437#[expect(clippy::too_many_arguments)]
438pub async fn server(
439 http_server: String,
440 listener: TcpListener,
441 engine: Arc<Engine>,
442 wh_server_state_watcher_parent: WebhookStateWatcher,
443 log_forwarder_sender: mpsc::Sender<LogInfoAppendRow>,
444 db_pool: Arc<dyn DbPool>,
445 clock_fn: Box<dyn ClockFn>,
446 sleep: Arc<dyn Sleep>,
447 max_inflight_requests: Option<Arc<tokio::sync::Semaphore>>,
448 server_termination_watcher: watch::Receiver<()>,
449) -> Result<(), WebhookServerError> {
450 loop {
451 let (stream, _) = listener
452 .accept()
453 .await
454 .map_err(WebhookServerError::SocketError)?;
455 let stream_id = format!("{stream:?}");
456 let stream = TokioIo::new(stream);
457
458 let mut wh_server_state_watcher = wh_server_state_watcher_parent.clone();
460 let state = wh_server_state_watcher.borrow_and_update().clone();
462 let deployment_id = state.deployment_id;
463 debug!(%deployment_id, %stream_id, "Initializing connection");
464 tokio::task::spawn(
466 {
467 let engine = engine.clone();
468 let clock_fn = clock_fn.clone_box();
469 let sleep = sleep.clone();
470 let db_pool = db_pool.clone();
471 let http_server = http_server.clone();
472 let connection_span = info_span!("connection", %http_server);
473 let max_inflight_requests = max_inflight_requests.clone();
474 let server_termination_watcher = server_termination_watcher.clone();
475 let log_forwarder_sender = log_forwarder_sender.clone();
476 async move {
477 let (connection_drop_sender, connection_drop_watcher) = watch::channel(());
478 let deployment_id = state.deployment_id;
479 let mut conn = http1::Builder::new()
480 .serve_connection(
481 stream,
482 hyper::service::service_fn({
483 move |req| {
484 let execution_id = ExecutionId::generate().get_top_level();
485 trace!(%execution_id, %deployment_id, method = %req.method(), uri = %req.uri(), "Processing request");
486 RequestHandler {
487 deployment_id: state.deployment_id,
488 engine: engine.clone(),
489 clock_fn: clock_fn.clone_box(),
490 sleep: sleep.clone(),
491 db_pool: db_pool.clone(),
492 fn_registry: state.fn_registry.clone(),
493 execution_id,
494 router: state.router.clone(),
495 connection_drop_watcher: connection_drop_watcher.clone(),
496 server_termination_watcher: server_termination_watcher.clone(),
497 log_forwarder_sender: log_forwarder_sender.clone()
498 }
499 .handle_request(req, max_inflight_requests.clone())
500 }.instrument(info_span!(parent: &connection_span, "request", %deployment_id))
501 })
502 );
503 let mut conn = Pin::new(&mut conn);
504 let res = loop {
505 select! {
506 result = conn.as_mut() => {
507 break result;
508 }
509 changed = wh_server_state_watcher.changed() => {
510 conn.as_mut().graceful_shutdown();
511 if changed.is_ok() {
512 let new_deployment_id = wh_server_state_watcher.borrow().deployment_id;
513 debug!(%http_server, "Switching to {new_deployment_id}, gracefully shutting down connection");
514 } else {
515 debug!(%http_server, "Deployment watcher dropped, gracefully shutting down connection");
516 break conn.as_mut().await;
517 }
518 }
519 }
520 };
521 if let Err(err) = res {
522 info!(%http_server, "Error serving connection: {err:?}");
523 drop(connection_drop_sender);
524 }
525 }
526 }.instrument(debug_span!("tcp stream", %deployment_id, %stream_id))
527 );
528 }
529}
530
531#[derive(Debug, Clone)]
532pub struct WebhookEndpointConfig {
533 pub component_id: ComponentId,
534 pub forward_stdout: Option<StdOutputConfig>,
535 pub forward_stderr: Option<StdOutputConfig>,
536 pub env_vars: Arc<[EnvVar]>,
537 pub fuel: Option<u64>,
538 pub backtrace_persist: bool,
539 pub subscription_interruption: Option<Duration>,
540 pub logs_store_min_level: Option<LogLevel>,
541 pub allowed_hosts: Arc<[crate::http_request_policy::AllowedHostConfig]>,
542 pub js_config: Option<WebhookEndpointJsConfig>,
543 pub config_section_hint: crate::http_hooks::ConfigSectionHint,
545}
546
547#[derive(Debug, Clone)]
548pub struct WebhookEndpointJsConfig {
549 pub source: String,
550 pub file_name: String,
551 pub resolved_imports: HashMap<String, Vec<(String, String)>>,
554}
555
556struct WebhookEndpointCtx {
557 component_id: ComponentId,
558 deployment_id: DeploymentId,
559 clock_fn: Box<dyn ClockFn>,
560 sleep: Arc<dyn Sleep>,
561 db_pool: Arc<dyn DbPool>,
562 fn_registry: Arc<dyn FunctionRegistry>,
563 table: ResourceTable,
564 wasi_ctx: WasiCtx,
565 http_ctx: WasiHttpCtx,
566 execution_id: ExecutionIdTopLevel,
567 next_join_set_idx: u64,
568 version: Option<Version>,
569 component_logger: ComponentLogger,
570 subscription_interruption: Option<Duration>,
571 connection_drop_watcher: watch::Receiver<()>,
572 server_termination_watcher: watch::Receiver<()>,
573 http_hooks: HttpHooks,
574}
575
576impl HostJoinSet for WebhookEndpointCtx {
577 fn id(&mut self, _resource: wasmtime::component::Resource<JoinSetId>) -> String {
578 unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
579 }
580
581 fn submit_delay(
582 &mut self,
583 _self_: wasmtime::component::Resource<JoinSetId>,
584 _timeout: types::obelisk::types::time::ScheduleAt,
585 ) -> types::obelisk::types::execution::DelayId {
586 unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
587 }
588
589 fn join_next(
590 &mut self,
591 _self_: wasmtime::component::Resource<JoinSetId>,
592 ) -> Result<(types::obelisk::types::execution::ResponseId, Result<(), ()>), JoinNextError> {
593 unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
594 }
595
596 fn drop(
597 &mut self,
598 _resource: wasmtime::component::Resource<JoinSetId>,
599 ) -> wasmtime::Result<()> {
600 unreachable!("webhook endpoint instances cannot obtain `join-set-id` resource")
601 }
602}
603
604impl ExecutionHost for WebhookEndpointCtx {
605 fn convert_schedule_json_error(
606 &mut self,
607 err: ScheduleJsonErrorTrappable,
608 ) -> wasmtime::Result<types::obelisk::types::execution::ScheduleJsonError> {
609 match err {
610 ScheduleJsonErrorTrappable::Normal(err) => Ok(err),
611 ScheduleJsonErrorTrappable::Trap(err) => Err(err),
612 }
613 }
614}
615
616fn wit_backtrace_to_storage(
617 bt: types::obelisk::types::backtrace::WasmBacktrace,
618) -> concepts::storage::WasmBacktrace {
619 concepts::storage::WasmBacktrace {
620 frames: bt
621 .frames
622 .into_iter()
623 .map(|f| concepts::storage::FrameInfo {
624 module: f.module,
625 func_name: f.func_name,
626 symbols: f
627 .symbols
628 .into_iter()
629 .map(|s| concepts::storage::FrameSymbol {
630 func_name: s.func_name,
631 file: s.file,
632 line: s.line,
633 col: s.col,
634 })
635 .collect(),
636 })
637 .collect(),
638 }
639}
640
641impl WebhookSupportHost for WebhookEndpointCtx {
642 async fn execution_id_generate(
643 &mut self,
644 ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::ExecutionId> {
645 let execution_id = ExecutionId::generate();
646 Ok(types::obelisk::webhook::webhook_support::ExecutionId {
647 id: execution_id.to_string(),
648 })
649 }
650
651 async fn current_execution_id(
653 &mut self,
654 ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::ExecutionId> {
655 self.execution_id_current().await
656 }
657
658 async fn execution_id_current(
659 &mut self,
660 ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::ExecutionId> {
661 Ok(types::obelisk::webhook::webhook_support::ExecutionId {
662 id: self.execution_id.to_string(),
663 })
664 }
665
666 fn convert_get_status_error(
667 &mut self,
668 err: GetStatusErrorTrappable,
669 ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::GetStatusError> {
670 match err {
671 GetStatusErrorTrappable::Normal(err) => Ok(err),
672 GetStatusErrorTrappable::Trap(err) => Err(err),
673 }
674 }
675
676 fn convert_get_error(
677 &mut self,
678 err: GetErrorTrappable,
679 ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::GetError> {
680 match err {
681 GetErrorTrappable::Normal(err) => Ok(err),
682 GetErrorTrappable::Trap(err) => Err(err),
683 }
684 }
685
686 fn convert_try_get_error(
687 &mut self,
688 err: TryGetErrorTrappable,
689 ) -> wasmtime::Result<types::obelisk::webhook::webhook_support::TryGetError> {
690 match err {
691 TryGetErrorTrappable::Normal(err) => Ok(err),
692 TryGetErrorTrappable::Trap(err) => Err(err),
693 }
694 }
695
696 async fn schedule_json(
697 &mut self,
698 execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
699 schedule_at: types::obelisk::webhook::webhook_support::ScheduleAt,
700 function: types::obelisk::webhook::webhook_support::Function,
701 params: String,
702 _config: Option<types::obelisk::webhook::webhook_support::SubmitConfig>,
703 backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
704 ) -> Result<(), ScheduleJsonErrorTrappable> {
705 use types::obelisk::types::execution::ScheduleJsonError;
706
707 let execution_id = match concepts::ExecutionId::from_str(&execution_id.id) {
709 Ok(id) => id,
710 Err(err) => {
711 let msg = format!("schedule-json: invalid execution ID: {err}");
712 self.error(msg.clone());
713 return Err(ScheduleJsonError::FfqnParsingError(msg).into());
714 }
715 };
716
717 let ffqn =
719 match FunctionFqn::try_from_tuple(&function.interface_name, &function.function_name) {
720 Ok(ffqn) => ffqn,
721 Err(err) => {
722 let msg = format!("schedule-json: invalid function name: {err}");
723 self.error(msg.clone());
724 return Err(ScheduleJsonError::FfqnParsingError(msg).into());
725 }
726 };
727
728 let Some((fn_metadata, component_id)) = self.fn_registry.get_by_exported_function(&ffqn)
730 else {
731 self.error("schedule-json: function not found".to_string());
732 return Err(ScheduleJsonError::FunctionNotFound.into());
733 };
734
735 let params_json: Vec<serde_json::Value> = match serde_json::from_str(¶ms) {
737 Ok(serde_json::Value::Array(arr)) => arr,
738 Ok(_) => {
739 let msg = "schedule-json: params must be a JSON array".to_string();
740 self.error(msg.clone());
741 return Err(ScheduleJsonError::TypeCheckError(msg).into());
742 }
743 Err(err) => {
744 let msg = format!("schedule-json: cannot parse params as JSON: {err}");
745 self.error(msg.clone());
746 return Err(ScheduleJsonError::TypeCheckError(msg).into());
747 }
748 };
749
750 let params = match Params::from_json_values(
752 Arc::from(params_json),
753 fn_metadata
754 .parameter_types
755 .iter()
756 .map(|param_type| ¶m_type.type_wrapper),
757 ) {
758 Ok(params) => params,
759 Err(err) => {
760 let msg = format!("schedule-json: params type checking failed: {err}");
761 self.error(msg.clone());
762 return Err(ScheduleJsonError::TypeCheckError(msg).into());
763 }
764 };
765
766 let history_event_schedule_at = schedule_at_from_webhook(schedule_at);
768 let created_at = self.clock_fn.now();
769 let schedule_at = match history_event_schedule_at.as_date_time(created_at) {
770 Ok(dt) => dt,
771 Err(err) => {
772 let msg = format!("schedule-json: invalid schedule-at: {err}");
773 self.error(msg.clone());
774 return Err(ScheduleJsonError::TypeCheckError(msg).into());
775 }
776 };
777
778 let version = match self.get_version_or_create().await {
780 Ok(v) => v,
781 Err(err) => {
782 return Err(wasmtime::Error::msg(format!("database error: {err:?}")).into());
783 }
784 };
785
786 let event = HistoryEvent::Schedule {
787 execution_id: execution_id.clone(),
788 schedule_at: history_event_schedule_at,
789 result: Ok(()),
790 };
791 let append_req = AppendRequest {
792 event: ExecutionRequest::HistoryEvent { event },
793 created_at,
794 };
795 let create_req = CreateRequest {
796 created_at,
797 execution_id: execution_id.clone(),
798 ffqn,
799 params,
800 parent: None,
801 metadata: ExecutionMetadata::from_linked_span(&self.component_logger.span),
802 scheduled_at: schedule_at,
803 component_id: component_id.clone(),
804 deployment_id: self.deployment_id,
805 scheduled_by: Some(ExecutionId::TopLevel(self.execution_id)),
806 paused: false,
807 };
808
809 let db_connection = match self.db_pool.connection().await {
810 Ok(conn) => conn,
811 Err(err) => {
812 return Err(
813 wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
814 );
815 }
816 };
817
818 let backtrace_infos: Vec<BacktraceInfo> = backtrace
819 .map(|bt| BacktraceInfo {
820 execution_id: ExecutionId::TopLevel(self.execution_id),
821 component_id: self.component_id.clone(),
822 version_min_including: version.clone(),
823 version_max_excluding: Version(version.0 + 1),
824 wasm_backtrace: wit_backtrace_to_storage(bt),
825 })
826 .into_iter()
827 .collect();
828
829 match db_connection
830 .append_batch_create_new_execution(
831 created_at,
832 vec![append_req],
833 ExecutionId::TopLevel(self.execution_id),
834 version.clone(),
835 vec![create_req],
836 backtrace_infos,
837 )
838 .await
839 {
840 Ok(new_version) => {
841 self.version = Some(new_version);
842 Ok(())
843 }
844 Err(err) => Err(wasmtime::Error::msg(format!("database write error: {err:?}")).into()),
845 }
846 }
847
848 async fn call_json(
849 &mut self,
850 function: types::obelisk::webhook::webhook_support::Function,
851 params: String,
852 _config: Option<types::obelisk::webhook::webhook_support::SubmitConfig>,
853 backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
854 ) -> Result<Result<Option<String>, Option<String>>, ScheduleJsonErrorTrappable> {
855 use types::obelisk::types::execution::ScheduleJsonError;
856
857 let ffqn =
859 match FunctionFqn::try_from_tuple(&function.interface_name, &function.function_name) {
860 Ok(ffqn) => ffqn,
861 Err(err) => {
862 let msg = format!("call-json: invalid function name: {err}");
863 self.error(msg.clone());
864 return Err(ScheduleJsonError::FfqnParsingError(msg).into());
865 }
866 };
867
868 let Some((fn_metadata, component_id)) = self.fn_registry.get_by_exported_function(&ffqn)
870 else {
871 self.error("call-json: function not found".to_string());
872 return Err(ScheduleJsonError::FunctionNotFound.into());
873 };
874
875 let params_json: Vec<serde_json::Value> = match serde_json::from_str(¶ms) {
877 Ok(serde_json::Value::Array(arr)) => arr,
878 Ok(_) => {
879 let msg = "call-json: params must be a JSON array".to_string();
880 self.error(msg.clone());
881 return Err(ScheduleJsonError::TypeCheckError(msg).into());
882 }
883 Err(err) => {
884 let msg = format!("call-json: cannot parse params as JSON: {err}");
885 self.error(msg.clone());
886 return Err(ScheduleJsonError::TypeCheckError(msg).into());
887 }
888 };
889
890 let params = match Params::from_json_values(
892 Arc::from(params_json),
893 fn_metadata
894 .parameter_types
895 .iter()
896 .map(|param_type| ¶m_type.type_wrapper),
897 ) {
898 Ok(params) => params,
899 Err(err) => {
900 let msg = format!("call-json: params type checking failed: {err}");
901 self.error(msg.clone());
902 return Err(ScheduleJsonError::TypeCheckError(msg).into());
903 }
904 };
905
906 let version = match self.get_version_or_create().await {
908 Ok(v) => v,
909 Err(err) => {
910 return Err(wasmtime::Error::msg(format!("database error: {err:?}")).into());
911 }
912 };
913
914 let (join_set_id, child_execution_id) = self.create_oneoff_join_set();
916
917 let created_at = self.clock_fn.now();
918
919 let req_join_set_created = AppendRequest {
921 created_at,
922 event: ExecutionRequest::HistoryEvent {
923 event: HistoryEvent::JoinSetCreate {
924 join_set_id: join_set_id.clone(),
925 },
926 },
927 };
928
929 let req_child_exec = AppendRequest {
931 created_at,
932 event: ExecutionRequest::HistoryEvent {
933 event: HistoryEvent::JoinSetRequest {
934 join_set_id: join_set_id.clone(),
935 request: JoinSetRequest::ChildExecutionRequest {
936 child_execution_id: child_execution_id.clone(),
937 target_ffqn: ffqn.clone(),
938 params: params.clone(),
939 result: Ok(()),
940 },
941 },
942 },
943 };
944
945 let req_join_next = AppendRequest {
947 created_at,
948 event: ExecutionRequest::HistoryEvent {
949 event: HistoryEvent::JoinNext {
950 join_set_id: join_set_id.clone(),
951 run_expires_at: created_at,
952 closing: false,
953 requested_ffqn: Some(ffqn.clone()),
954 },
955 },
956 };
957
958 let req_create_child = CreateRequest {
960 created_at,
961 execution_id: ExecutionId::Derived(child_execution_id.clone()),
962 ffqn,
963 params,
964 parent: Some((ExecutionId::TopLevel(self.execution_id), join_set_id)),
965 metadata: ExecutionMetadata::from_parent_span(&self.component_logger.span),
966 scheduled_at: created_at,
967 component_id: component_id.clone(),
968 deployment_id: self.deployment_id,
969 scheduled_by: None,
970 paused: false,
971 };
972
973 let db_connection = match self.db_pool.connection().await {
974 Ok(conn) => conn,
975 Err(err) => {
976 return Err(
977 wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
978 );
979 }
980 };
981
982 let appended = vec![req_join_set_created, req_child_exec, req_join_next];
983
984 let backtrace_infos: Vec<BacktraceInfo> = backtrace
985 .map(|bt| BacktraceInfo {
986 execution_id: ExecutionId::TopLevel(self.execution_id),
987 component_id: self.component_id.clone(),
988 version_min_including: version.clone(),
989 version_max_excluding: Version(version.0 + 3),
990 wasm_backtrace: wit_backtrace_to_storage(bt),
991 })
992 .into_iter()
993 .collect();
994
995 match db_connection
996 .append_batch_create_new_execution(
997 created_at,
998 appended,
999 ExecutionId::TopLevel(self.execution_id),
1000 version,
1001 vec![req_create_child],
1002 backtrace_infos,
1003 )
1004 .await
1005 {
1006 Ok(new_version) => {
1007 self.version = Some(new_version);
1008 }
1009 Err(err) => {
1010 return Err(wasmtime::Error::msg(format!("database write error: {err:?}")).into());
1011 }
1012 }
1013
1014 let result = Self::wait_for_finished_result(
1016 self.subscription_interruption,
1017 &self.sleep,
1018 db_connection.as_ref(),
1019 &ExecutionId::Derived(child_execution_id),
1020 &self.connection_drop_watcher,
1021 &self.server_termination_watcher,
1022 )
1023 .await;
1024
1025 match result {
1026 Ok(retval) => Ok(supported_return_value_to_json_result(&retval)),
1027 Err(err) => Err(wasmtime::Error::msg(format!("execution error: {err:?}")).into()),
1028 }
1029 }
1030
1031 async fn get_status(
1032 &mut self,
1033 execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
1034 _backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
1035 ) -> Result<types::obelisk::webhook::webhook_support::ExecutionStatus, GetStatusErrorTrappable>
1036 {
1037 use types::obelisk::webhook::webhook_support::{
1038 ExecutionStatus, ExecutionStatusFinished, GetStatusError,
1039 };
1040
1041 let execution_id = match concepts::ExecutionId::from_str(&execution_id.id) {
1043 Ok(id) => id,
1044 Err(err) => {
1045 let msg = format!("get-status: cannot parse execution ID: {err}");
1046 self.error(msg.clone());
1047 return Err(GetStatusError::ExecutionIdParsingError(msg).into());
1048 }
1049 };
1050
1051 let db_connection = match self.db_pool.connection().await {
1053 Ok(conn) => conn,
1054 Err(err) => {
1055 return Err(
1056 wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
1057 );
1058 }
1059 };
1060
1061 let execution_with_state = match db_connection.get_pending_state(&execution_id).await {
1062 Ok(state) => state,
1063 Err(DbErrorRead::NotFound) => {
1064 return Err(GetStatusError::NotFound.into());
1065 }
1066 Err(err) => {
1067 return Err(wasmtime::Error::msg(format!("database read error: {err:?}")).into());
1068 }
1069 };
1070
1071 let status = match execution_with_state.pending_state {
1073 PendingState::PendingAt(state) => {
1074 ExecutionStatus::PendingAt(types::obelisk::types::time::Datetime {
1075 seconds: u64::try_from(state.scheduled_at.timestamp())
1076 .expect("pending at before unix epoch is unsupported"),
1077 nanoseconds: state.scheduled_at.timestamp_subsec_nanos(),
1078 })
1079 }
1080 PendingState::Locked(_) => ExecutionStatus::Locked,
1081 PendingState::BlockedByJoinSet(_) => ExecutionStatus::BlockedByJoinSet,
1082 PendingState::Paused(_) => ExecutionStatus::Paused,
1083 PendingState::Finished(finished) => {
1084 let finished_status = match finished.result_kind {
1085 PendingStateFinishedResultKind::Ok => ExecutionStatusFinished::Ok,
1086 PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error) => {
1087 ExecutionStatusFinished::Err
1088 }
1089 PendingStateFinishedResultKind::Err(
1090 PendingStateFinishedError::ExecutionFailure(_),
1091 ) => ExecutionStatusFinished::ExecutionFailure,
1092 };
1093 ExecutionStatus::Finished(finished_status)
1094 }
1095 };
1096
1097 Ok(status)
1098 }
1099
1100 async fn get(
1101 &mut self,
1102 execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
1103 _backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
1104 ) -> Result<Result<Option<String>, Option<String>>, GetErrorTrappable> {
1105 use types::obelisk::webhook::webhook_support::GetError;
1106
1107 let parsed_execution_id: concepts::ExecutionId =
1109 match concepts::ExecutionId::from_str(&execution_id.id) {
1110 Ok(id) => id,
1111 Err(err) => {
1112 let msg = format!("get: cannot parse execution ID: {err}");
1113 self.error(msg.clone());
1114 return Err(GetError::ExecutionIdParsingError(msg).into());
1115 }
1116 };
1117
1118 let db_connection = match self.db_pool.connection().await {
1120 Ok(conn) => conn,
1121 Err(err) => {
1122 return Err(
1123 wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
1124 );
1125 }
1126 };
1127
1128 let subscription_interruption = self.subscription_interruption;
1130 let sleep = self.sleep.clone();
1131 let connection_drop_watcher = self.connection_drop_watcher.clone();
1132 let server_termination_watcher = self.server_termination_watcher.clone();
1133
1134 let result = Self::wait_for_finished_result(
1136 subscription_interruption,
1137 &sleep,
1138 db_connection.as_ref(),
1139 &parsed_execution_id,
1140 &connection_drop_watcher,
1141 &server_termination_watcher,
1142 )
1143 .await;
1144
1145 match result {
1146 Ok(retval) => Ok(supported_return_value_to_json_result(&retval)),
1147 Err(err) => Err(wasmtime::Error::msg(format!("execution error: {err:?}")).into()),
1148 }
1149 }
1150
1151 async fn try_get(
1152 &mut self,
1153 execution_id: types::obelisk::webhook::webhook_support::ExecutionId,
1154 _backtrace: Option<types::obelisk::types::backtrace::WasmBacktrace>,
1155 ) -> Result<Result<Option<String>, Option<String>>, TryGetErrorTrappable> {
1156 use types::obelisk::webhook::webhook_support::TryGetError;
1157
1158 let parsed_execution_id: concepts::ExecutionId =
1160 match concepts::ExecutionId::from_str(&execution_id.id) {
1161 Ok(id) => id,
1162 Err(err) => {
1163 let msg = format!("try-get: cannot parse execution ID: {err}");
1164 self.error(msg.clone());
1165 return Err(TryGetError::ExecutionIdParsingError(msg).into());
1166 }
1167 };
1168
1169 let db_connection = match self.db_pool.connection().await {
1171 Ok(conn) => conn,
1172 Err(err) => {
1173 return Err(
1174 wasmtime::Error::msg(format!("database connection error: {err:?}")).into(),
1175 );
1176 }
1177 };
1178
1179 let execution_with_state = match db_connection.get_pending_state(&parsed_execution_id).await
1181 {
1182 Ok(state) => state,
1183 Err(DbErrorRead::NotFound) => {
1184 self.error(format!("try-get: execution not found: {}", execution_id.id));
1185 return Err(TryGetError::NotFound.into());
1186 }
1187 Err(err) => {
1188 return Err(wasmtime::Error::msg(format!("database read error: {err:?}")).into());
1189 }
1190 };
1191
1192 match execution_with_state.pending_state {
1194 PendingState::Finished(_) => {
1195 match db_connection
1197 .wait_for_finished_result(&parsed_execution_id, None)
1198 .await
1199 {
1200 Ok(retval) => Ok(supported_return_value_to_json_result(&retval)),
1201 Err(err) => {
1202 Err(wasmtime::Error::msg(format!("database read error: {err:?}")).into())
1203 }
1204 }
1205 }
1206 _ => Err(TryGetError::NotFinishedYet.into()),
1207 }
1208 }
1209}
1210
1211#[derive(thiserror::Error, Debug, Clone)]
1212
1213enum WebhookEndpointFunctionError {
1214 #[error(transparent)]
1215 DbError(#[from] DbErrorWrite),
1216 #[error(transparent)]
1217 FinishedExecutionFailure(#[from] FinishedExecutionFailure),
1218 #[error("uncategorized error: {0}")]
1219 UncategorizedError(&'static str),
1220 #[error("connection closed")]
1221 ConnectionClosed,
1222}
1223impl From<DbErrorGeneric> for WebhookEndpointFunctionError {
1224 fn from(value: DbErrorGeneric) -> Self {
1225 WebhookEndpointFunctionError::DbError(DbErrorWrite::Generic(value))
1226 }
1227}
1228
1229impl wasmtime::component::HasData for WebhookEndpointCtx {
1230 type Data<'a> = &'a mut WebhookEndpointCtx;
1231}
1232
1233impl WebhookEndpointCtx {
1234 async fn get_version_or_create(&mut self) -> Result<Version, DbErrorWrite> {
1236 if let Some(found) = &self.version {
1237 return Ok(found.clone());
1238 }
1239 let created_at = self.clock_fn.now();
1240 let metadata = concepts::ExecutionMetadata::from_parent_span(&self.component_logger.span);
1242 let create_request = CreateRequest {
1243 created_at,
1244 execution_id: ExecutionId::TopLevel(self.execution_id),
1245 ffqn: HTTP_HANDLER_FFQN,
1246 params: Params::empty(),
1247 parent: None,
1248 metadata,
1249 scheduled_at: created_at,
1250 component_id: self.component_id.clone(),
1251 deployment_id: self.deployment_id,
1252 scheduled_by: None,
1253 paused: false,
1254 };
1255 let conn = self.db_pool.connection().await?;
1256 let version = conn.create(create_request).await?;
1257 self.version = Some(version.clone());
1258 Ok(version)
1259 }
1260
1261 fn create_oneoff_join_set(&mut self) -> (JoinSetId, ExecutionIdDerived) {
1263 let join_set_id = JoinSetId::new(
1264 JoinSetKind::OneOff,
1265 StrVariant::from(self.next_join_set_idx.to_string()),
1266 )
1267 .expect("numeric names must be allowed");
1268 self.next_join_set_idx += 1;
1269 let child_execution_id = ExecutionId::TopLevel(self.execution_id).next_level(&join_set_id);
1270 (join_set_id, child_execution_id)
1271 }
1272
1273 #[instrument(skip_all, fields(%ffqn, version, %execution_id = self.execution_id))]
1274 async fn call_imported_fn(
1275 &mut self,
1276 ffqn: FunctionFqn,
1277 params: &[Val],
1278 results: &mut [Val],
1279 wasm_backtrace: Option<concepts::storage::WasmBacktrace>,
1280 ) -> Result<(), WebhookEndpointFunctionError> {
1281 trace!(?params, "call_imported_fn start");
1282 assert_eq!(
1283 1,
1284 results.len(),
1285 "direct call: no-ext export must return `result`, -schedule returns `execuiton-id`"
1286 );
1287
1288 if self.connection_drop_watcher.has_changed().is_err()
1289 || self.server_termination_watcher.has_changed().is_err()
1290 {
1291 debug!("Cancellation request detected");
1292 return Err(WebhookEndpointFunctionError::ConnectionClosed);
1293 }
1294
1295 if let Some(package_name) = ffqn.ifc_fqn.package_strip_obelisk_schedule_suffix() {
1296 let ifc_fqn = IfcFqnName::from_parts(
1298 ffqn.ifc_fqn.namespace(),
1299 package_name,
1300 ffqn.ifc_fqn.ifc_name(),
1301 ffqn.ifc_fqn.version(),
1302 );
1303 if let Some(function_name) = ffqn.function_name.strip_suffix(SUFFIX_FN_SCHEDULE) {
1304 let ffqn =
1305 FunctionFqn::new_arc(Arc::from(ifc_fqn.to_string()), Arc::from(function_name));
1306 debug!("Got `-schedule` extension for {ffqn}");
1307 let Some((schedule_at, params)) = params.split_first() else {
1308 error!(
1309 "Error running `-schedule` extension function: exepcted at least one parameter of type `schedule-at`, got empty parameter list"
1310 );
1311 return Err(WebhookEndpointFunctionError::UncategorizedError(
1312 "error running `-schedule` extension function: exepcted at least one parameter of type `schedule-at`, got empty parameter list",
1313 ));
1314 };
1315 let schedule_at =
1316 WastVal::try_from(schedule_at.clone()).map_err(|err| {
1317 error!("Error running `-schedule` extension function: cannot convert to internal representation - {err:?}");
1318 WebhookEndpointFunctionError::UncategorizedError(
1319 "error running `-schedule` extension function: cannot convert to internal representation",
1320 )
1321 })?;
1322 let schedule_at = match history_event_schedule_at_from_wast_val(&schedule_at) {
1323 Ok(ok) => ok,
1324 Err(err) => {
1325 error!(
1326 "Wrong type for the first `-schedule` extension function parameter, expected `schedule-at`, got `{schedule_at:?}` - {err:?}"
1327 );
1328 return Err(WebhookEndpointFunctionError::UncategorizedError(
1329 "error running `-schedule` extension function: wrong first parameter type",
1330 ));
1331 }
1332 };
1333 let version = self.get_version_or_create().await?;
1335 let span = Span::current();
1336 span.record("version", tracing::field::display(&version));
1337 let new_execution_id = ExecutionId::generate();
1338 let (_function_metadata, child_component_id) = self
1339 .fn_registry
1340 .get_by_exported_function(&ffqn)
1341 .expect("target function must be found in fn_registry");
1342 let created_at = self.clock_fn.now();
1343
1344 let event = HistoryEvent::Schedule {
1345 execution_id: new_execution_id.clone(),
1346 schedule_at,
1347 result: Ok(()),
1348 };
1349 let schedule_at = schedule_at.as_date_time(created_at).map_err(|_err| {
1350 WebhookEndpointFunctionError::UncategorizedError("schedule-at conversion error")
1351 })?;
1352 let child_exec_req = AppendRequest {
1353 event: ExecutionRequest::HistoryEvent { event },
1354 created_at,
1355 };
1356
1357 let create_child_req = CreateRequest {
1358 created_at,
1359 execution_id: new_execution_id.clone(),
1360 ffqn,
1361 params: Params::from_wasmtime(Arc::from(params)),
1362 parent: None, metadata: ExecutionMetadata::from_linked_span(&self.component_logger.span),
1364 scheduled_at: schedule_at,
1365 component_id: child_component_id.clone(),
1366 deployment_id: self.deployment_id,
1367 scheduled_by: Some(ExecutionId::TopLevel(self.execution_id)),
1368 paused: false,
1369 };
1370 let db_connection = self.db_pool.connection().await?;
1371 let expected_next_version = version.increment();
1372 let backtrace_info = wasm_backtrace.map(|wasm_backtrace| BacktraceInfo {
1373 execution_id: ExecutionId::TopLevel(self.execution_id),
1374 component_id: self.component_id.clone(),
1375 version_min_including: version.clone(),
1376 version_max_excluding: expected_next_version.clone(),
1377 wasm_backtrace,
1378 });
1379 let version = db_connection
1380 .append_batch_create_new_execution(
1381 created_at,
1382 vec![child_exec_req],
1383 ExecutionId::TopLevel(self.execution_id),
1384 version.clone(),
1385 vec![create_child_req],
1386 backtrace_info.into_iter().collect(),
1387 )
1388 .await?;
1389 assert_eq!(version, expected_next_version); self.version = Some(version.clone());
1391 results[0] = execution_id_into_val(&new_execution_id);
1392 } else {
1393 error!("unrecognized `{SUFFIX_PKG_SCHEDULE}` extension function {ffqn}");
1394 return Err(WebhookEndpointFunctionError::UncategorizedError(
1395 "unrecognized extension function",
1396 ));
1397 }
1398 } else {
1399 let version = self.get_version_or_create().await?;
1401 let span = Span::current();
1402 span.record("version", tracing::field::display(&version));
1403 let (join_set_id_direct, child_execution_id) = self.create_oneoff_join_set();
1404 let created_at = self.clock_fn.now();
1405 let (fn_metadata, child_component_id) = self
1406 .fn_registry
1407 .get_by_exported_function(&ffqn)
1408 .expect("import was mocked using fn_registry exports limited to -schedule and no-ext functions");
1409 assert!(
1410 fn_metadata.extension.is_none(),
1411 "direct call: function must be no-ext"
1412 );
1413 let return_type_tl = assert_matches!(fn_metadata.return_type, ReturnType::Extendable(compatible) => compatible.type_wrapper_tl);
1414
1415 let req_join_set_created = AppendRequest {
1416 created_at,
1417 event: ExecutionRequest::HistoryEvent {
1418 event: HistoryEvent::JoinSetCreate {
1419 join_set_id: join_set_id_direct.clone(),
1420 },
1421 },
1422 };
1423 let params = Params::from_wasmtime(Arc::from(params));
1424 let req_child_exec = AppendRequest {
1425 created_at,
1426 event: ExecutionRequest::HistoryEvent {
1427 event: HistoryEvent::JoinSetRequest {
1428 join_set_id: join_set_id_direct.clone(),
1429 request: JoinSetRequest::ChildExecutionRequest {
1430 child_execution_id: child_execution_id.clone(),
1431 target_ffqn: ffqn.clone(),
1432 params: params.clone(),
1433 result: Ok(()),
1434 },
1435 },
1436 },
1437 };
1438 let req_join_next = AppendRequest {
1439 created_at,
1440 event: ExecutionRequest::HistoryEvent {
1441 event: HistoryEvent::JoinNext {
1442 join_set_id: join_set_id_direct.clone(),
1443 run_expires_at: created_at, closing: false,
1445 requested_ffqn: Some(ffqn.clone()), },
1447 },
1448 };
1449 let req_create_child = CreateRequest {
1450 created_at,
1451 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1452 ffqn: ffqn.clone(),
1453 params,
1454 parent: Some((ExecutionId::TopLevel(self.execution_id), join_set_id_direct)),
1455 metadata: ExecutionMetadata::from_parent_span(&self.component_logger.span),
1456 scheduled_at: created_at,
1457 component_id: child_component_id.clone(),
1458 deployment_id: self.deployment_id,
1459 scheduled_by: None,
1460 paused: false,
1461 };
1462 let db_connection = self.db_pool.connection().await?;
1463 let appended = vec![req_join_set_created, req_child_exec, req_join_next];
1464 let expected_next_version = Version(version.0 + 3);
1465 let backtrace_info = wasm_backtrace.map(|wasm_backtrace| BacktraceInfo {
1466 execution_id: ExecutionId::TopLevel(self.execution_id),
1467 component_id: self.component_id.clone(),
1468 version_min_including: version.clone(),
1469 version_max_excluding: expected_next_version.clone(),
1470 wasm_backtrace,
1471 });
1472
1473 let version = db_connection
1474 .append_batch_create_new_execution(
1475 created_at,
1476 appended,
1477 ExecutionId::TopLevel(self.execution_id),
1478 version,
1479 vec![req_create_child],
1480 backtrace_info.into_iter().collect(),
1481 )
1482 .await?;
1483 assert_eq!(version, expected_next_version); self.version = Some(version);
1485
1486 let res = Self::wait_for_finished_result(
1487 self.subscription_interruption,
1488 &self.sleep,
1489 db_connection.as_ref(),
1490 &ExecutionId::Derived(child_execution_id),
1491 &self.connection_drop_watcher,
1492 &self.server_termination_watcher,
1493 )
1494 .await?;
1495 results[0] = res.into_wast_val(move || return_type_tl).as_val();
1496
1497 trace!(?results, "call_imported_fn finish");
1498 }
1499 Ok(())
1500 }
1501
1502 async fn wait_for_finished_result(
1503 subscription_interruption: Option<Duration>,
1504 sleep: &Arc<dyn Sleep>,
1505 db_connection: &dyn DbConnection,
1506 execution_id: &ExecutionId,
1507 connection_drop_watcher: &watch::Receiver<()>,
1508 server_termination_watcher: &watch::Receiver<()>,
1509 ) -> Result<SupportedFunctionReturnValue, WebhookEndpointFunctionError> {
1510 let timeout_factory = move || {
1511 let subscription_interruption = subscription_interruption.unwrap_or(Duration::MAX);
1512 let sleep = sleep.clone();
1513 let mut connection_drop_watcher = connection_drop_watcher.clone();
1514 let mut server_termination_watcher = server_termination_watcher.clone();
1515 Box::pin(async move {
1516 select! {
1517 () = sleep.sleep(subscription_interruption) => TimeoutOutcome::Timeout,
1518 _ = connection_drop_watcher.changed() => TimeoutOutcome::Cancel,
1519 _ = server_termination_watcher.changed() => TimeoutOutcome::Cancel,
1520 }
1521 })
1522 };
1523
1524 loop {
1525 let timeout = timeout_factory();
1526 let res = db_connection
1527 .wait_for_finished_result(execution_id, Some(timeout))
1528 .await;
1529 match res {
1530 Ok(ok) => {
1531 trace!("Finished ok");
1532 return Ok(ok);
1533 }
1534 Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout)) => {
1535 trace!("Timeout triggers resubscribing");
1536 }
1537 Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Cancel)) => {
1538 debug!("Connection closed, not waiting for result");
1539 return Err(WebhookEndpointFunctionError::ConnectionClosed);
1540 }
1541 Err(DbErrorReadWithTimeout::DbErrorRead(err)) => {
1542 warn!("Database error: {err:?}");
1543 return Err(WebhookEndpointFunctionError::from(DbErrorWrite::from(err)));
1544 }
1545 }
1546 }
1547 }
1548
1549 fn add_to_linker(linker: &mut Linker<WebhookEndpointCtx>) -> Result<(), WasmFileError> {
1550 log_activities::obelisk::log::log::add_to_linker::<_, WebhookEndpointCtx>(linker, |x| x)
1552 .map_err(|err| WasmFileError::linking_error("cannot link log activities", err))?;
1553 types::obelisk::types::execution::add_to_linker::<_, WebhookEndpointCtx>(linker, |x| x)
1555 .map_err(|err| WasmFileError::linking_error("cannot link obelisk:types", err))?;
1556 types::obelisk::webhook::webhook_support::add_to_linker::<_, WebhookEndpointCtx>(
1558 linker,
1559 |x| x,
1560 )
1561 .map_err(|err| WasmFileError::linking_error("cannot link obelisk:webhook", err))?;
1562 Ok(())
1563 }
1564
1565 #[must_use]
1566 #[expect(clippy::too_many_arguments)]
1567 fn new<'a>(
1568 deployment_id: DeploymentId,
1569 config: &Arc<WebhookEndpointConfig>,
1570 engine: &Engine,
1571 clock_fn: Box<dyn ClockFn>,
1572 sleep: Arc<dyn Sleep>,
1573 db_pool: Arc<dyn DbPool>,
1574 fn_registry: Arc<dyn FunctionRegistry>,
1575 params: impl Iterator<Item = (&'a str, &'a str)>,
1576 execution_id: ExecutionIdTopLevel,
1577 request_span: Span,
1578 connection_drop_watcher: watch::Receiver<()>,
1579 server_termination_watcher: watch::Receiver<()>,
1580 stdout: Option<StdOutput>,
1581 stderr: Option<StdOutput>,
1582 run_id: RunId,
1583 logs_storage_config: Option<LogStrageConfig>,
1584 ) -> Store<WebhookEndpointCtx> {
1585 let mut wasi_ctx = WasiCtxBuilder::new();
1586 if let Some(stdout) = stdout {
1587 let stdout = LogStream::new(
1588 format!(
1589 "[{component_id} {execution_id} stdout]",
1590 component_id = config.component_id
1591 ),
1592 stdout,
1593 );
1594 wasi_ctx.stdout(stdout);
1595 }
1596 if let Some(stderr) = stderr {
1597 let stderr = LogStream::new(
1598 format!(
1599 "[{component_id} {execution_id} stderr]",
1600 component_id = config.component_id
1601 ),
1602 stderr,
1603 );
1604 wasi_ctx.stderr(stderr);
1605 }
1606 for env_var in config.env_vars.as_ref() {
1607 wasi_ctx.env(&env_var.key, &env_var.val);
1608 }
1609 if let Some(js_config) = &config.js_config {
1610 wasi_ctx.env("__OBELISK_JS_SOURCE__", &js_config.source);
1611 wasi_ctx.env("__OBELISK_JS_FILE_NAME__", &js_config.file_name);
1612 if !js_config.resolved_imports.is_empty() {
1613 let imports_json = serde_json::to_string(&js_config.resolved_imports)
1614 .expect("resolved imports must be serializable");
1615 wasi_ctx.env("__OBELISK_RESOLVED_IMPORTS__", &imports_json);
1616 }
1617 }
1618
1619 let http_policy =
1621 crate::policy_builder::build_http_policy(&config.allowed_hosts, &mut wasi_ctx);
1622
1623 for (key, val) in params {
1624 wasi_ctx.env(key, val);
1625 }
1626 let wasi_ctx = wasi_ctx.build();
1627 let component_logger = ComponentLogger {
1628 span: request_span,
1629 execution_id: ExecutionId::TopLevel(execution_id),
1630 run_id,
1631 logs_storage_config,
1632 };
1633 let ctx = WebhookEndpointCtx {
1635 clock_fn: clock_fn.clone_box(),
1636 sleep,
1637 db_pool,
1638 fn_registry,
1639 table: ResourceTable::new(),
1640 wasi_ctx,
1641 http_ctx: WasiHttpCtx::new(),
1642 version: None,
1643 component_id: config.component_id.clone(),
1644 deployment_id,
1645 next_join_set_idx: JOIN_SET_START_IDX,
1646 execution_id,
1647 component_logger: component_logger.clone(),
1648 subscription_interruption: config.subscription_interruption,
1649 connection_drop_watcher,
1650 server_termination_watcher,
1651 http_hooks: HttpHooks {
1652 clock_fn,
1653 http_client_traces: HttpClientTracesContainer::default(),
1654 http_policy,
1655 component_logger,
1656 config_section_hint: config.config_section_hint,
1657 },
1658 };
1659 let mut store = Store::new(engine, ctx);
1660
1661 if let Some(fuel) = config.fuel {
1663 store
1664 .set_fuel(fuel)
1665 .expect("engine must have `consume_fuel` enabled");
1666 }
1667
1668 store.epoch_deadline_callback(|_store_ctx| Ok(UpdateDeadline::Yield(1)));
1670 store
1671 }
1672
1673 async fn close(
1674 self,
1675 original_result: wasmtime::Result<()>,
1676 assigned_fuel: Option<u64>,
1677 ) -> wasmtime::Result<()> {
1678 #[derive(Debug, thiserror::Error)]
1679 #[error("webhook {trap_kind}: {reason}")]
1680 struct WebhookTrap {
1681 reason: String,
1682 trap_kind: TrapKind,
1683 detail: Option<String>,
1684 }
1685
1686 let result = match &original_result {
1687 Ok(()) => SUPPORTED_RETURN_VALUE_OK_EMPTY,
1688 Err(err) => {
1689 let err = if let Some(trap) = err
1690 .source()
1691 .and_then(|source| source.downcast_ref::<wasmtime::Trap>())
1692 {
1693 if *trap == wasmtime::Trap::OutOfFuel {
1694 WebhookTrap {
1695 reason: format!(
1696 "total fuel consumed: {}",
1697 assigned_fuel
1698 .expect("must have been set as it was the reason of trap")
1699 ),
1700 detail: None,
1701 trap_kind: TrapKind::OutOfFuel,
1702 }
1703 } else {
1704 WebhookTrap {
1705 reason: trap.to_string(),
1706 detail: Some(format!("{err:?}")),
1707 trap_kind: TrapKind::Trap,
1708 }
1709 }
1710 } else {
1711 WebhookTrap {
1712 reason: err.to_string(),
1713 trap_kind: TrapKind::HostFunctionError,
1714 detail: Some(format!("{err:?}")),
1715 }
1716 };
1717
1718 SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure {
1719 reason: Some(err.to_string()),
1720 kind: ExecutionFailureKind::Uncategorized,
1721 detail: err.detail,
1722 })
1723 }
1724 };
1725 if let Some(version) = self.version {
1726 let http_client_traces = Some(
1727 self.http_hooks
1728 .http_client_traces
1729 .into_iter()
1730 .map(|(req, mut resp)| HttpClientTrace {
1731 req,
1732 resp: resp.try_recv().ok(),
1733 })
1734 .collect(),
1735 );
1736 self.db_pool
1737 .connection()
1738 .await?
1739 .append(
1740 ExecutionId::TopLevel(self.execution_id),
1741 version,
1742 AppendRequest {
1743 created_at: self.clock_fn.now(),
1744 event: ExecutionRequest::Finished {
1745 retval: result,
1746 http_client_traces,
1747 },
1748 },
1749 )
1750 .await?;
1751 }
1752 original_result
1753 }
1754}
1755
1756impl log_activities::obelisk::log::log::Host for WebhookEndpointCtx {
1757 fn trace(&mut self, message: String) {
1758 self.component_logger.log(LogLevel::Trace, message);
1759 }
1760
1761 fn debug(&mut self, message: String) {
1762 self.component_logger.log(LogLevel::Debug, message);
1763 }
1764
1765 fn info(&mut self, message: String) {
1766 self.component_logger.log(LogLevel::Info, message);
1767 }
1768
1769 fn warn(&mut self, message: String) {
1770 self.component_logger.log(LogLevel::Warn, message);
1771 }
1772
1773 fn error(&mut self, message: String) {
1774 self.component_logger.log(LogLevel::Error, message);
1775 }
1776}
1777
1778impl WasiView for WebhookEndpointCtx {
1779 fn ctx(&mut self) -> WasiCtxView<'_> {
1780 WasiCtxView {
1781 ctx: &mut self.wasi_ctx,
1782 table: &mut self.table,
1783 }
1784 }
1785}
1786impl IoView for WebhookEndpointCtx {
1787 fn table(&mut self) -> &mut ResourceTable {
1788 &mut self.table
1789 }
1790}
1791impl WasiHttpView for WebhookEndpointCtx {
1792 fn http(&mut self) -> WasiHttpCtxView<'_> {
1793 WasiHttpCtxView {
1794 ctx: &mut self.http_ctx,
1795 table: &mut self.table,
1796 hooks: &mut self.http_hooks,
1797 }
1798 }
1799}
1800
1801struct RequestHandler {
1802 deployment_id: DeploymentId,
1803 engine: Arc<Engine>,
1804 clock_fn: Box<dyn ClockFn>,
1805 sleep: Arc<dyn Sleep>,
1806 db_pool: Arc<dyn DbPool>,
1807 fn_registry: Arc<dyn FunctionRegistry>,
1808 execution_id: ExecutionIdTopLevel,
1809 router: Arc<MethodAwareRouter<WebhookEndpointInstanceLinked>>,
1810 connection_drop_watcher: watch::Receiver<()>,
1811 server_termination_watcher: watch::Receiver<()>,
1812 log_forwarder_sender: mpsc::Sender<LogInfoAppendRow>,
1813}
1814
1815fn respond(body: &str, status_code: StatusCode) -> hyper::Response<HyperOutgoingBody> {
1816 let body = UnsyncBoxBody::new(http_body_util::BodyExt::map_err(
1817 http_body_util::Full::new(Bytes::copy_from_slice(body.as_bytes())),
1818 |_| unreachable!(),
1819 ));
1820 hyper::Response::builder()
1821 .status(status_code)
1822 .body(body)
1823 .unwrap()
1824}
1825
1826impl RequestHandler {
1827 #[instrument(skip_all, name="incoming webhook request", fields(execution_id = %self.execution_id))]
1828 async fn handle_request(
1829 self,
1830 req: hyper::Request<hyper::body::Incoming>,
1831 max_inflight_requests: Option<Arc<tokio::sync::Semaphore>>,
1832 ) -> Result<hyper::Response<HyperOutgoingBody>, hyper::Error> {
1833 let http_request_guard = if let Some(http_request_semaphore) = &max_inflight_requests {
1834 http_request_semaphore.clone().try_acquire_owned().map(Some)
1835 } else {
1836 Ok(None)
1837 };
1838 let Ok(http_request_guard) = http_request_guard else {
1839 debug!(method = %req.method(), uri = %req.uri(), "Too many requests");
1840 return Ok::<_, hyper::Error>(respond("Out of permits", StatusCode::TOO_MANY_REQUESTS));
1841 };
1842
1843 let res = self
1844 .handle_request_inner(req, http_request_guard, Span::current())
1845 .await;
1846 match res {
1847 Ok(body) => Ok(body),
1848 Err(err) => {
1849 debug!("{err:?}");
1850 Ok(match err {
1851 HandleRequestError::IncomingRequestError(err) => respond(
1852 &format!("Incoming request error: {err}"),
1853 StatusCode::BAD_REQUEST,
1854 ),
1855 HandleRequestError::ResponseCreationError(err) => respond(
1856 &format!("Cannot create response: {err}"),
1857 StatusCode::INTERNAL_SERVER_ERROR,
1858 ),
1859 HandleRequestError::InstantiationError(err) => respond(
1860 &format!("Cannot instantiate: {err}"),
1861 StatusCode::SERVICE_UNAVAILABLE,
1862 ),
1863 HandleRequestError::ErrorCode(code) => respond(
1864 &format!("Error code: {code}"),
1865 StatusCode::INTERNAL_SERVER_ERROR,
1866 ),
1867 HandleRequestError::ExecutionError(_) => {
1868 respond("Component Error", StatusCode::INTERNAL_SERVER_ERROR)
1869 }
1870 HandleRequestError::RouteNotFound => {
1871 respond("Route not found", StatusCode::NOT_FOUND)
1872 }
1873 HandleRequestError::Timeout => respond("Timeout", StatusCode::REQUEST_TIMEOUT),
1874 HandleRequestError::InstanceLimitReached => {
1875 respond("Instance limit reached", StatusCode::SERVICE_UNAVAILABLE)
1876 }
1877 })
1878 }
1879 }
1880 }
1881
1882 async fn handle_request_inner(
1883 self,
1884 req: hyper::Request<hyper::body::Incoming>,
1885 http_request_guard: Option<OwnedSemaphorePermit>,
1886 request_span: Span,
1887 ) -> Result<hyper::Response<HyperOutgoingBody>, HandleRequestError> {
1888 #[derive(Debug, thiserror::Error)]
1889 #[error("timeout")]
1890 struct TimeoutError;
1891
1892 if let Some(instance_match) = self.router.find(req.method(), req.uri()) {
1893 let found_instance = instance_match.handler();
1894 let found_instance = found_instance.build(&self.log_forwarder_sender);
1895 let run_id = RunId::generate();
1896 let stdout = found_instance.stdout.as_ref().map(|stdoutput| {
1897 stdoutput.build(&ExecutionId::TopLevel(self.execution_id), run_id)
1898 });
1899 let stderr = found_instance.stderr.as_ref().map(|stdoutput| {
1900 stdoutput.build(&ExecutionId::TopLevel(self.execution_id), run_id)
1901 });
1902 let (sender, receiver) = tokio::sync::oneshot::channel();
1903 let mut store = WebhookEndpointCtx::new(
1904 self.deployment_id,
1905 &found_instance.config,
1906 &self.engine,
1907 self.clock_fn,
1908 self.sleep,
1909 self.db_pool,
1910 self.fn_registry,
1911 instance_match.params().iter(),
1912 self.execution_id,
1913 request_span.clone(),
1914 self.connection_drop_watcher,
1915 self.server_termination_watcher,
1916 stdout,
1917 stderr,
1918 run_id,
1919 found_instance.logs_storage_config.clone(),
1920 );
1921 let req = store
1922 .data_mut()
1923 .http()
1924 .new_incoming_request(Scheme::Http, req)
1925 .map_err(|err| HandleRequestError::IncomingRequestError(err.into()))?;
1926 let out = store
1927 .data_mut()
1928 .http()
1929 .new_response_outparam(sender)
1930 .map_err(|err| HandleRequestError::ResponseCreationError(err.into()))?;
1931 let proxy = found_instance
1932 .proxy_pre
1933 .instantiate_async(&mut store)
1934 .await
1935 .map_err(|err| HandleRequestError::InstantiationError(err.into()))?;
1936
1937 let task = tokio::task::spawn({
1938 let assigned_fuel = found_instance.config.fuel;
1939 async move {
1940 let _http_request_guard = http_request_guard;
1941 let result = proxy
1942 .wasi_http_incoming_handler()
1943 .call_handle(&mut store, req, out)
1944 .await
1945 .inspect_err(|err| debug!("Webhook instance returned error: {err:?}"));
1946 let ctx = store.into_data();
1947 ctx.close(result, assigned_fuel).await
1948 }
1949 .instrument(request_span)
1950 });
1951 match receiver.await {
1952 Ok(Ok(resp)) => {
1953 trace!("Streaming the response");
1954 Ok(resp)
1955 }
1956 Ok(Err(err)) => {
1957 debug!("Webhook instance sent error code {err:?}");
1958 Err(HandleRequestError::ErrorCode(err))
1959 }
1960 Err(_recv_err) => {
1961 let err = match task.await {
1969 Ok(r) => {
1970 r.expect_err("if the receiver has an error, the task must have failed")
1971 } Err(e) => e.into(), };
1974 if err.downcast_ref::<TimeoutError>().is_some() {
1975 Err(HandleRequestError::Timeout)
1976 } else {
1977 info!("Webhook task ended with ExecutionError - {err:?}");
1978 Err(HandleRequestError::ExecutionError(err.into()))
1979 }
1980 }
1981 }
1982 } else {
1983 Err(HandleRequestError::RouteNotFound)
1984 }
1985 }
1986}
1987#[derive(Debug, thiserror::Error)]
1988pub enum HandleRequestError {
1989 #[error("incoming request error: {0}")]
1990 IncomingRequestError(StdError),
1991 #[error("response creation error: {0}")]
1992 ResponseCreationError(StdError),
1993 #[error("instantiation error: {0}")]
1994 InstantiationError(StdError),
1995 #[error("error code: {0}")]
1996 ErrorCode(wasmtime_wasi_http::p2::bindings::http::types::ErrorCode),
1997 #[error("execution error: {0}")]
1998 ExecutionError(StdError),
1999 #[error("route not found")]
2000 RouteNotFound,
2001 #[error("instance limit reached")]
2002 InstanceLimitReached,
2003 #[error("timeout")]
2004 Timeout,
2005}
2006
2007fn execution_id_into_val(execution_id: &ExecutionId) -> Val {
2008 Val::Record(vec![(
2009 "id".to_string(),
2010 Val::String(execution_id.to_string()),
2011 )])
2012}
2013
2014#[cfg(test)]
2015pub(crate) mod tests {
2016 use crate::{
2017 RunnableComponent,
2018 engines::{EngineConfig, Engines},
2019 };
2020
2021 use super::MethodAwareRouter;
2022 use assert_matches::assert_matches;
2023 use concepts::ComponentType;
2024 use hyper::{Method, Uri};
2025
2026 pub(crate) fn compile_webhook(wasm_path: &str) -> RunnableComponent {
2027 let engine = Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2028 RunnableComponent::new(wasm_path, &engine, ComponentType::WebhookEndpoint).unwrap()
2029 }
2030
2031 pub(crate) mod fibo {
2032 use super::*;
2033 use crate::activity::activity_worker::test::compile_activity;
2034 use crate::activity::activity_worker::tests::new_activity_fibo;
2035 use crate::activity::cancel_registry::CancelRegistry;
2036 use crate::engines::{EngineConfig, Engines};
2037 use crate::http_hooks::ConfigSectionHint;
2038 use crate::http_request_policy::{AllowedHostConfig, HostPattern, MethodsPattern};
2039 use crate::std_output_stream::StdOutputConfig;
2040 use crate::testing_fn_registry::TestingFnRegistry;
2041 use crate::webhook::webhook_trigger::{
2042 self, WebhookEndpointCompiled, WebhookEndpointConfig, WebhookServerError,
2043 WebhookServerState,
2044 };
2045 use crate::workflow::workflow_worker::JoinNextBlockingStrategy;
2046 use crate::workflow::workflow_worker::test::compile_workflow;
2047 use crate::workflow::workflow_worker::tests::{FIBOA_WORKFLOW_FFQN, new_workflow_fibo};
2048 use concepts::component_id::ComponentDigest;
2049 use concepts::prefixed_ulid::{DEPLOYMENT_ID_DUMMY, RunId};
2050 use concepts::storage::DbPoolCloseable;
2051 use concepts::time::ClockFn;
2052 use concepts::time::TokioSleep;
2053 use concepts::{ComponentId, ComponentType, Params, StrVariant};
2054 use concepts::{ExecutionId, storage::DbPool};
2055 use db_tests::{Database, DbGuard, DbPoolCloseableWrapper};
2056 use executor::executor::{ExecTask, LockingStrategy};
2057 use rstest::rstest;
2058 use serde_json::json;
2059 use std::net::SocketAddr;
2060 use std::str::FromStr;
2061 use std::sync::Arc;
2062 use std::time::Duration;
2063 use test_db_macro::expand_enum_database;
2064 use test_utils::sim_clock::SimClock;
2065 use tokio::net::TcpListener;
2066 use tokio::sync::{mpsc, watch};
2067 use tracing::info;
2068 use utils::sha256sum::calculate_sha256_file;
2069
2070 struct SetUpFiboWebhook {
2071 #[expect(dead_code)]
2072 set: tokio::task::JoinSet<Result<(), WebhookServerError>>,
2073 #[expect(dead_code)]
2074 guard: DbGuard,
2075 db_pool: Arc<dyn DbPool>,
2076 server_addr: SocketAddr,
2077 activity_exec: ExecTask,
2078 workflow_exec: ExecTask,
2079 sim_clock: SimClock,
2080 db_close: DbPoolCloseableWrapper,
2081 #[expect(dead_code)]
2082 server_termination_sender: watch::Sender<()>,
2083 #[expect(dead_code)]
2084 wh_server_state_sender: watch::Sender<Arc<WebhookServerState>>,
2085 }
2086
2087 impl SetUpFiboWebhook {
2088 async fn new(
2089 db: db_tests::Database,
2090 locking_strategy: LockingStrategy,
2091 ) -> SetUpFiboWebhook {
2092 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
2093 let sim_clock = SimClock::default();
2094 let (guard, db_pool, db_close) = db.set_up().await;
2095 let activity_exec = new_activity_fibo(
2096 db_pool.clone(),
2097 sim_clock.clone_box(),
2098 TokioSleep,
2099 locking_strategy,
2100 )
2101 .await;
2102
2103 let (workflow_runnable, workflow_component_id) = compile_workflow(
2104 test_programs_fibo_workflow_builder::TEST_PROGRAMS_FIBO_WORKFLOW,
2105 )
2106 .await;
2107
2108 let fn_registry = TestingFnRegistry::new_from_components(vec![
2109 compile_activity(
2110 test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
2111 )
2112 .await,
2113 (workflow_runnable, workflow_component_id),
2114 ]);
2115 let cancel_registry = CancelRegistry::new();
2116 let engine =
2117 Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2118 let workflow_exec = new_workflow_fibo(
2119 db_pool.clone(),
2120 sim_clock.clone_box(),
2121 JoinNextBlockingStrategy::Interrupt,
2122 &fn_registry,
2123 cancel_registry,
2124 locking_strategy,
2125 )
2126 .await;
2127 let (db_forwarder_sender, _) = mpsc::channel(1);
2128 let wasm_file = test_programs_fibo_webhook_builder::TEST_PROGRAMS_FIBO_WEBHOOK;
2129 let router = {
2130 let runnable_component =
2131 RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2132 .unwrap();
2133 let instance = WebhookEndpointCompiled::new(
2134 WebhookEndpointConfig {
2135 component_id: ComponentId::new(
2136 ComponentType::WebhookEndpoint,
2137 StrVariant::empty(),
2138 ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
2139 )
2140 .unwrap(),
2141 forward_stdout: Some(StdOutputConfig::Stdout),
2142 forward_stderr: Some(StdOutputConfig::Stdout),
2143 env_vars: Arc::from([]),
2144 fuel: None,
2145 backtrace_persist: false,
2146 subscription_interruption: None,
2147 logs_store_min_level: None,
2148 allowed_hosts: Arc::from([]),
2149 js_config: None,
2150 config_section_hint: ConfigSectionHint::WebhookEndpointWasm,
2151 },
2152 runnable_component,
2153 )
2154 .unwrap()
2155 .link(&engine, fn_registry.as_ref())
2156 .unwrap();
2157 let mut router = MethodAwareRouter::default();
2158 router.add(Some(Method::GET), "/fibo/:N/:ITERATIONS", instance);
2159 router
2160 };
2161 let tcp_listener = TcpListener::bind(addr).await.unwrap();
2162 let server_addr = tcp_listener.local_addr().unwrap();
2163 info!("Listening on port {}", server_addr.port());
2164 let (server_termination_sender, server_termination_watcher) = watch::channel(());
2165 let initial_state = Arc::new(webhook_trigger::WebhookServerState {
2166 deployment_id: DEPLOYMENT_ID_DUMMY,
2167 router: Arc::new(router),
2168 fn_registry,
2169 });
2170 let (wh_server_state_sender, wh_server_state_watcher) =
2171 watch::channel(initial_state);
2172 let mut set = tokio::task::JoinSet::new();
2173 set.spawn(webhook_trigger::server(
2174 "test".to_string(),
2175 tcp_listener,
2176 engine,
2177 wh_server_state_watcher,
2178 db_forwarder_sender,
2179 db_pool.clone(),
2180 sim_clock.clone_box(),
2181 Arc::new(TokioSleep),
2182 None,
2183 server_termination_watcher,
2184 ));
2185 SetUpFiboWebhook {
2186 set,
2187 guard,
2188 db_pool,
2189 server_addr,
2190 activity_exec,
2191 workflow_exec,
2192 sim_clock,
2193 db_close,
2194 server_termination_sender,
2195 wh_server_state_sender,
2196 }
2197 }
2198
2199 async fn fibo_fetch(
2200 server_addr: &str,
2201 n: u8,
2202 iterations: u32,
2203 expected_status_code: u16,
2204 ) -> String {
2205 let resp = reqwest::get(format!("http://{server_addr}/fibo/{n}/{iterations}"))
2206 .await
2207 .unwrap();
2208 assert_eq!(resp.status().as_u16(), expected_status_code);
2209 resp.text().await.unwrap()
2210 }
2211
2212 async fn close(self) {
2213 self.db_close.close().await;
2214 }
2215 }
2216
2217 #[rstest]
2218 #[tokio::test]
2219 async fn hardcoded_result_should_work(
2220 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2221 locking_strategy: LockingStrategy,
2222 ) {
2223 test_utils::set_up();
2224 let fibo_webhook_harness =
2225 SetUpFiboWebhook::new(Database::Sqlite, locking_strategy).await;
2226 let server_addr = fibo_webhook_harness.server_addr.to_string();
2227 assert_eq!(
2228 "fiboa(1, 0) = hardcoded: 1",
2229 SetUpFiboWebhook::fibo_fetch(&server_addr, 1, 0, 200).await
2230 );
2231 }
2232
2233 #[expand_enum_database]
2234 #[rstest]
2235 #[tokio::test]
2236 async fn direct_call_should_work(
2237 db: Database,
2238 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2239 locking_strategy: LockingStrategy,
2240 ) {
2241 test_utils::set_up();
2242 let fibo_webhook_harness = SetUpFiboWebhook::new(db, locking_strategy).await;
2243 let server_addr = fibo_webhook_harness.server_addr.to_string();
2244 let fetch_task =
2245 tokio::spawn(
2246 async move { SetUpFiboWebhook::fibo_fetch(&server_addr, 2, 1, 200).await },
2247 );
2248
2249 let now = fibo_webhook_harness.sim_clock.now();
2250 while fibo_webhook_harness
2251 .workflow_exec
2252 .tick_test_await(now, RunId::generate())
2253 .await
2254 .is_empty()
2255 {
2256 tokio::time::sleep(Duration::from_millis(100)).await;
2257 }
2258 assert_eq!(
2260 1,
2261 fibo_webhook_harness
2262 .activity_exec
2263 .tick_test_await(now, RunId::generate())
2264 .await
2265 .len()
2266 );
2267 assert_eq!(
2269 1,
2270 fibo_webhook_harness
2271 .workflow_exec
2272 .tick_test_await(now, RunId::generate())
2273 .await
2274 .len()
2275 );
2276 let res = fetch_task.await.unwrap();
2277 assert_eq!("fiboa(2, 1) = direct call: 1", res);
2278 }
2279
2280 #[expand_enum_database]
2281 #[rstest]
2282 #[tokio::test]
2283 async fn scheduling_should_work(
2284 db: db_tests::Database,
2285 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2286 locking_strategy: LockingStrategy,
2287 ) {
2288 test_utils::set_up();
2289 let fibo_webhook_harness = SetUpFiboWebhook::new(db, locking_strategy).await;
2290 let server_addr = fibo_webhook_harness.server_addr.to_string();
2291 let n = 10;
2292 let iterations = 1;
2293 let resp = SetUpFiboWebhook::fibo_fetch(&server_addr, n, iterations, 200).await;
2294
2295 let execution_id = resp
2296 .strip_prefix(&format!("fiboa({n}, {iterations}) = scheduled: "))
2297 .unwrap();
2298 let execution_id = ExecutionId::from_str(execution_id).unwrap();
2299 let conn = fibo_webhook_harness.db_pool.connection().await.unwrap();
2300 let create_req = conn.get_create_request(&execution_id).await.unwrap();
2301 assert_eq!(FIBOA_WORKFLOW_FFQN, create_req.ffqn);
2302 let expected_params = Params::from_json_values_test(vec![json!(10), json!(1)]);
2303 assert_eq!(
2304 serde_json::to_string(&expected_params).unwrap(),
2305 serde_json::to_string(&create_req.params).unwrap()
2306 );
2307 }
2308
2309 #[rstest]
2310 #[tokio::test]
2311 async fn test_routing_error_handling(
2312 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2313 locking_strategy: LockingStrategy,
2314 ) {
2315 test_utils::set_up();
2316 let fibo_webhook_harness =
2317 SetUpFiboWebhook::new(Database::Sqlite, locking_strategy).await;
2318 let resp = reqwest::get(format!(
2320 "http://{}/unknown",
2321 &fibo_webhook_harness.server_addr
2322 ))
2323 .await
2324 .unwrap();
2325 assert_eq!(resp.status().as_u16(), 404);
2326 assert_eq!("Route not found", resp.text().await.unwrap());
2327 let resp = reqwest::get(format!(
2329 "http://{}/fibo/0/1",
2330 &fibo_webhook_harness.server_addr
2331 ))
2332 .await
2333 .unwrap();
2334 assert_eq!(resp.status().as_u16(), 500);
2335 assert_eq!("Component Error", resp.text().await.unwrap());
2336 fibo_webhook_harness.close().await;
2337 }
2338
2339 #[expand_enum_database]
2340 #[rstest]
2341 #[tokio::test]
2342 async fn http_client_traces_should_be_captured(
2343 db: db_tests::Database,
2344 #[values(LockingStrategy::ByFfqns, LockingStrategy::ByComponentDigest)]
2345 locking_strategy: LockingStrategy,
2346 ) {
2347 use concepts::storage::ExecutionRequest;
2348 use concepts::storage::http_client_trace::{
2349 HttpClientTrace, RequestTrace, ResponseTrace,
2350 };
2351 use wiremock::{
2352 Mock, MockServer, ResponseTemplate,
2353 matchers::{method, path},
2354 };
2355 const BODY: &str = "webhook-http-trace-test-body";
2356 test_utils::set_up();
2357 let sim_clock = SimClock::default();
2358 let (db_guard, db_pool, db_close) = db.set_up().await;
2359
2360 let mock_listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2362 let mock_address = mock_listener.local_addr().unwrap();
2363 let mock_allowed_host = format!("http://127.0.0.1:{port}", port = mock_address.port());
2364 let mock_uri = format!("{mock_allowed_host}/");
2365 let mock_server = MockServer::builder().listener(mock_listener).start().await;
2366 Mock::given(method("GET"))
2367 .and(path("/"))
2368 .respond_with(ResponseTemplate::new(200).set_body_string(BODY))
2369 .expect(1)
2370 .mount(&mock_server)
2371 .await;
2372
2373 let activity_exec = crate::activity::activity_worker::tests::new_activity_fibo(
2375 db_pool.clone(),
2376 sim_clock.clone_box(),
2377 TokioSleep,
2378 locking_strategy,
2379 )
2380 .await;
2381
2382 let (workflow_runnable, workflow_component_id) =
2383 compile_workflow(test_programs_fibo_workflow_builder::TEST_PROGRAMS_FIBO_WORKFLOW)
2384 .await;
2385
2386 let fn_registry =
2387 crate::testing_fn_registry::TestingFnRegistry::new_from_components(vec![
2388 crate::activity::activity_worker::test::compile_activity(
2389 test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
2390 )
2391 .await,
2392 (workflow_runnable, workflow_component_id),
2393 ]);
2394
2395 let engine = crate::engines::Engines::get_webhook_engine(
2396 crate::engines::EngineConfig::on_demand_testing(),
2397 )
2398 .unwrap();
2399
2400 let (db_forwarder_sender, _) = mpsc::channel(1);
2402 let wasm_file = test_programs_http_get_webhook_builder::TEST_PROGRAMS_HTTP_GET_WEBHOOK;
2403 let router = {
2404 let runnable_component =
2405 RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2406 .unwrap();
2407 let instance = WebhookEndpointCompiled::new(
2408 WebhookEndpointConfig {
2409 component_id: ComponentId::new(
2410 ComponentType::WebhookEndpoint,
2411 StrVariant::empty(),
2412 concepts::component_id::ComponentDigest(
2413 utils::sha256sum::calculate_sha256_file(wasm_file)
2414 .await
2415 .unwrap()
2416 .0,
2417 ),
2418 )
2419 .unwrap(),
2420 forward_stdout: Some(StdOutputConfig::Stdout),
2421 forward_stderr: Some(StdOutputConfig::Stdout),
2422 env_vars: Arc::from([]),
2423 fuel: None,
2424 backtrace_persist: false,
2425 subscription_interruption: None,
2426 logs_store_min_level: None,
2427 allowed_hosts: Arc::from(vec![AllowedHostConfig {
2428 pattern: HostPattern::parse_with_methods(
2429 &mock_allowed_host,
2430 MethodsPattern::AllMethods,
2431 )
2432 .unwrap(),
2433 secret_env_mappings: Vec::new(),
2434 replace_in: hashbrown::HashSet::new(),
2435 }]),
2436 js_config: None,
2437 config_section_hint: ConfigSectionHint::WebhookEndpointWasm,
2438 },
2439 runnable_component,
2440 )
2441 .unwrap()
2442 .link(&engine, fn_registry.as_ref())
2443 .unwrap();
2444 let mut router = MethodAwareRouter::default();
2445 router.add(Some(Method::GET), "/http-get/:PORT", instance);
2446 router
2447 };
2448
2449 let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2450 let server_addr = tcp_listener.local_addr().unwrap();
2451 info!("Listening on port {}", server_addr.port());
2452 let (_server_termination_sender, server_termination_watcher) = watch::channel(());
2453 let (_wh_server_state_sender, wh_server_state_watcher) =
2454 watch::channel(Arc::new(WebhookServerState {
2455 deployment_id: DEPLOYMENT_ID_DUMMY,
2456 router: Arc::new(router),
2457 fn_registry,
2458 }));
2459 let mut set = tokio::task::JoinSet::new();
2460 set.spawn(webhook_trigger::server(
2461 "test".to_string(),
2462 tcp_listener,
2463 engine,
2464 wh_server_state_watcher,
2465 db_forwarder_sender,
2466 db_pool.clone(),
2467 sim_clock.clone_box(),
2468 Arc::new(TokioSleep),
2469 None,
2470 server_termination_watcher,
2471 ));
2472
2473 let mock_port = mock_address.port();
2475 let resp = reqwest::get(format!("http://{server_addr}/http-get/{mock_port}"))
2476 .await
2477 .unwrap();
2478 assert_eq!(resp.status().as_u16(), 200);
2479 let resp_body = resp.text().await.unwrap();
2480 let mut lines = resp_body.lines();
2481 assert_eq!(Some(BODY), lines.next());
2482 let child_execution_id_str = lines.next().expect("response must contain execution id");
2483 let child_execution_id = ExecutionId::from_str(child_execution_id_str).unwrap();
2484
2485 let conn = db_pool.connection().await.unwrap();
2487 let create_req = conn.get_create_request(&child_execution_id).await.unwrap();
2488 let webhook_exec_id = create_req
2489 .scheduled_by
2490 .expect("child must have scheduled_by set");
2491
2492 let finished_event = loop {
2494 let exec_log = conn.get(&webhook_exec_id).await.unwrap();
2495 if let ExecutionRequest::Finished {
2496 http_client_traces, ..
2497 } = &exec_log.last_event().event
2498 {
2499 break http_client_traces.clone();
2500 }
2501 tokio::time::sleep(Duration::from_millis(10)).await;
2502 };
2503
2504 let http_client_traces = finished_event
2505 .as_ref()
2506 .expect("http_client_traces must be Some");
2507 assert_eq!(1, http_client_traces.len());
2508 let trace = &http_client_traces[0];
2509 assert_matches!(
2510 trace,
2511 HttpClientTrace {
2512 req: RequestTrace {
2513 method,
2514 uri,
2515 sent_at: _,
2516 },
2517 resp: Some(ResponseTrace {
2518 status: Ok(200),
2519 finished_at: _,
2520 }),
2521 } => {
2522 assert_eq!("GET", method);
2523 assert_eq!(&mock_uri, uri);
2524 }
2525 );
2526
2527 drop(conn);
2528 drop(activity_exec);
2529 drop(set);
2530 drop(db_guard);
2531 db_close.close().await;
2532 }
2533
2534 #[tokio::test]
2535 async fn http_get_denied_host() {
2536 use wiremock::{
2537 Mock, MockServer, ResponseTemplate,
2538 matchers::{method, path},
2539 };
2540 test_utils::set_up();
2541 let sim_clock = SimClock::default();
2542 let (db_guard, db_pool, db_close) = Database::Sqlite.set_up().await;
2543
2544 let mock_listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2546 let mock_address = mock_listener.local_addr().unwrap();
2547 let mock_server = MockServer::builder().listener(mock_listener).start().await;
2548 Mock::given(method("GET"))
2549 .and(path("/"))
2550 .respond_with(ResponseTemplate::new(200).set_body_string("should-not-reach"))
2551 .expect(0) .mount(&mock_server)
2553 .await;
2554
2555 let activity_exec = crate::activity::activity_worker::tests::new_activity_fibo(
2557 db_pool.clone(),
2558 sim_clock.clone_box(),
2559 TokioSleep,
2560 LockingStrategy::ByComponentDigest,
2561 )
2562 .await;
2563
2564 let (workflow_runnable, workflow_component_id) =
2565 compile_workflow(test_programs_fibo_workflow_builder::TEST_PROGRAMS_FIBO_WORKFLOW)
2566 .await;
2567
2568 let fn_registry =
2569 crate::testing_fn_registry::TestingFnRegistry::new_from_components(vec![
2570 crate::activity::activity_worker::test::compile_activity(
2571 test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
2572 )
2573 .await,
2574 (workflow_runnable, workflow_component_id),
2575 ]);
2576
2577 let engine = crate::engines::Engines::get_webhook_engine(
2578 crate::engines::EngineConfig::on_demand_testing(),
2579 )
2580 .unwrap();
2581
2582 let (db_forwarder_sender, _) = mpsc::channel(1);
2584 let wasm_file = test_programs_http_get_webhook_builder::TEST_PROGRAMS_HTTP_GET_WEBHOOK;
2585 let router = {
2586 let runnable_component =
2587 RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2588 .unwrap();
2589 let instance = WebhookEndpointCompiled::new(
2590 WebhookEndpointConfig {
2591 component_id: ComponentId::new(
2592 ComponentType::WebhookEndpoint,
2593 StrVariant::empty(),
2594 concepts::component_id::ComponentDigest(
2595 utils::sha256sum::calculate_sha256_file(wasm_file)
2596 .await
2597 .unwrap()
2598 .0,
2599 ),
2600 )
2601 .unwrap(),
2602 forward_stdout: Some(StdOutputConfig::Stdout),
2603 forward_stderr: Some(StdOutputConfig::Stdout),
2604 env_vars: Arc::from([]),
2605 fuel: None,
2606 backtrace_persist: false,
2607 subscription_interruption: None,
2608 logs_store_min_level: None,
2609 allowed_hosts: Arc::from([]), js_config: None,
2611 config_section_hint: ConfigSectionHint::WebhookEndpointWasm,
2612 },
2613 runnable_component,
2614 )
2615 .unwrap()
2616 .link(&engine, fn_registry.as_ref())
2617 .unwrap();
2618 let mut router = MethodAwareRouter::default();
2619 router.add(Some(Method::GET), "/http-get/:PORT", instance);
2620 router
2621 };
2622
2623 let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2624 let server_addr = tcp_listener.local_addr().unwrap();
2625 info!("Listening on port {}", server_addr.port());
2626 let (_server_termination_sender, server_termination_watcher) = watch::channel(());
2627 let (_wh_server_state_sender, wh_server_state_watcher) =
2628 watch::channel(Arc::new(WebhookServerState {
2629 deployment_id: DEPLOYMENT_ID_DUMMY,
2630 router: Arc::new(router),
2631 fn_registry,
2632 }));
2633 let mut set = tokio::task::JoinSet::new();
2634 set.spawn(webhook_trigger::server(
2635 "test".to_string(),
2636 tcp_listener,
2637 engine,
2638 wh_server_state_watcher,
2639 db_forwarder_sender,
2640 db_pool.clone(),
2641 sim_clock.clone_box(),
2642 Arc::new(TokioSleep),
2643 None,
2644 server_termination_watcher,
2645 ));
2646
2647 let mock_port = mock_address.port();
2649 let resp = reqwest::get(format!("http://{server_addr}/http-get/{mock_port}"))
2650 .await
2651 .unwrap();
2652 assert_eq!(resp.status().as_u16(), 500);
2654 let body = resp.text().await.unwrap();
2655 assert_eq!("Component Error", body);
2656
2657 mock_server.verify().await;
2659
2660 drop(activity_exec);
2661 drop(set);
2662 drop(db_guard);
2663 db_close.close().await;
2664 }
2665 }
2666
2667 pub(crate) mod js_runtime {
2668 use crate::RunnableComponent;
2669 use crate::engines::{EngineConfig, Engines};
2670 use crate::std_output_stream::StdOutputConfig;
2671 use crate::testing_fn_registry::TestingFnRegistry;
2672 use crate::webhook::webhook_trigger::{
2673 self, MethodAwareRouter, WebhookEndpointCompiled, WebhookEndpointConfig,
2674 WebhookEndpointJsConfig, WebhookServerError, WebhookServerState,
2675 };
2676 use concepts::component_id::ComponentDigest;
2677 use concepts::prefixed_ulid::DEPLOYMENT_ID_DUMMY;
2678 use concepts::time::{ClockFn, TokioSleep};
2679 use concepts::{ComponentId, ComponentType, StrVariant};
2680 use std::collections::HashMap;
2681 use std::net::SocketAddr;
2682 use std::sync::Arc;
2683 use test_utils::sim_clock::SimClock;
2684 use tokio::net::TcpListener;
2685 use tokio::sync::{mpsc, watch};
2686 use tracing::info;
2687 use utils::sha256sum::calculate_sha256_file;
2688
2689 struct WatchGuard {
2690 #[expect(dead_code)]
2691 server_termination_sender: watch::Sender<()>,
2692 #[expect(dead_code)]
2693 wh_server_state_sender: watch::Sender<Arc<WebhookServerState>>,
2694 }
2695
2696 async fn start_js_webhook_server(
2697 source: &str,
2698 ) -> (
2699 tokio::task::JoinSet<Result<(), WebhookServerError>>,
2700 SocketAddr,
2701 WatchGuard,
2702 ) {
2703 let sim_clock = SimClock::default();
2704 let (_guard, db_pool, _db_close) = db_tests::Database::Sqlite.set_up().await;
2705 let fn_registry = TestingFnRegistry::new_from_components(vec![]);
2706 let engine = Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2707 let (db_forwarder_sender, _) = mpsc::channel(1);
2708 let wasm_file = webhook_js_runtime_builder::WEBHOOK_JS_RUNTIME;
2709 let router = {
2710 let runnable_component =
2711 RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2712 .unwrap();
2713 let instance = WebhookEndpointCompiled::new(
2714 WebhookEndpointConfig {
2715 component_id: ComponentId::new(
2716 ComponentType::WebhookEndpoint,
2717 StrVariant::empty(),
2718 ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
2719 )
2720 .unwrap(),
2721 forward_stdout: Some(StdOutputConfig::Stdout),
2722 forward_stderr: Some(StdOutputConfig::Stdout),
2723 env_vars: Arc::from([]),
2724 fuel: None,
2725 backtrace_persist: false,
2726 subscription_interruption: None,
2727 logs_store_min_level: None,
2728 allowed_hosts: Arc::from([]),
2729 js_config: Some(WebhookEndpointJsConfig {
2730 source: source.to_string(),
2731 file_name: String::new(),
2732 resolved_imports: HashMap::new(),
2733 }),
2734 config_section_hint:
2735 crate::http_hooks::ConfigSectionHint::WebhookEndpointJs,
2736 },
2737 runnable_component,
2738 )
2739 .unwrap()
2740 .link(&engine, fn_registry.as_ref())
2741 .unwrap();
2742 let mut router = MethodAwareRouter::default();
2743 router.add(None, "", instance);
2744 router
2745 };
2746 let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
2747 .await
2748 .unwrap();
2749 let server_addr = tcp_listener.local_addr().unwrap();
2750 info!("JS webhook listening on port {}", server_addr.port());
2751 let (server_termination_sender, server_termination_watcher) = watch::channel(());
2752 let (wh_server_state_sender, wh_server_state_watcher) =
2753 watch::channel(Arc::new(WebhookServerState {
2754 deployment_id: DEPLOYMENT_ID_DUMMY,
2755 router: Arc::new(router),
2756 fn_registry,
2757 }));
2758 let mut set = tokio::task::JoinSet::new();
2759 set.spawn(webhook_trigger::server(
2760 "test-js".to_string(),
2761 tcp_listener,
2762 engine,
2763 wh_server_state_watcher,
2764 db_forwarder_sender,
2765 db_pool,
2766 sim_clock.clone_box(),
2767 Arc::new(TokioSleep),
2768 None,
2769 server_termination_watcher,
2770 ));
2771 (
2772 set,
2773 server_addr,
2774 WatchGuard {
2775 server_termination_sender,
2776 wh_server_state_sender,
2777 },
2778 )
2779 }
2780
2781 #[tokio::test]
2782 async fn webhook_js_hello_world() {
2783 test_utils::set_up();
2784 let js_source = r#"
2785 export default function handle(request) {
2786 return new Response("Hello from JS!", {
2787 status: 200,
2788 headers: { "content-type": "text/plain" },
2789 });
2790 }
2791 "#;
2792 let (_server, server_addr, _termination_sender) =
2793 start_js_webhook_server(js_source).await;
2794 let resp = reqwest::get(format!("http://{server_addr}/"))
2795 .await
2796 .unwrap();
2797 assert_eq!(resp.status().as_u16(), 200);
2798 assert_eq!("Hello from JS!", resp.text().await.unwrap());
2799 }
2800
2801 #[tokio::test]
2802 async fn webhook_js_reads_request_method_and_url() {
2803 test_utils::set_up();
2804 let js_source = r#"
2805 export default function handle(request) {
2806 return new Response(request.method + " " + request.url);
2807 }
2808 "#;
2809 let (_server, server_addr, _termination_sender) =
2810 start_js_webhook_server(js_source).await;
2811 let resp = reqwest::get(format!("http://{server_addr}/some/path"))
2812 .await
2813 .unwrap();
2814 assert_eq!(resp.status().as_u16(), 200);
2815 let body = resp.text().await.unwrap();
2816 assert!(
2817 body.contains("GET") && body.contains("/some/path"),
2818 "Expected method and path in response, got: {body}"
2819 );
2820 }
2821
2822 #[tokio::test]
2823 async fn webhook_js_custom_status_code() {
2824 test_utils::set_up();
2825 let js_source = r#"
2826 export default function handle(request) {
2827 return new Response("created", { status: 201 });
2828 }
2829 "#;
2830 let (_server, server_addr, _termination_sender) =
2831 start_js_webhook_server(js_source).await;
2832 let resp = reqwest::get(format!("http://{server_addr}/"))
2833 .await
2834 .unwrap();
2835 assert_eq!(resp.status().as_u16(), 201);
2836 assert_eq!("created", resp.text().await.unwrap());
2837 }
2838
2839 async fn start_js_webhook_server_with_http(
2840 source: &str,
2841 allowed_host: &str,
2842 ) -> (
2843 tokio::task::JoinSet<Result<(), WebhookServerError>>,
2844 SocketAddr,
2845 WatchGuard,
2846 ) {
2847 use crate::http_request_policy::{AllowedHostConfig, HostPattern, MethodsPattern};
2848 let host_pattern =
2849 HostPattern::parse_with_methods(allowed_host, MethodsPattern::AllMethods).unwrap();
2850 let sim_clock = SimClock::default();
2851 let (_guard, db_pool, _db_close) = db_tests::Database::Sqlite.set_up().await;
2852 let fn_registry = TestingFnRegistry::new_from_components(vec![]);
2853 let engine = Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
2854 let (db_forwarder_sender, _) = mpsc::channel(1);
2855 let wasm_file = webhook_js_runtime_builder::WEBHOOK_JS_RUNTIME;
2856 let router = {
2857 let runnable_component =
2858 RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
2859 .unwrap();
2860 let instance = WebhookEndpointCompiled::new(
2861 WebhookEndpointConfig {
2862 component_id: ComponentId::new(
2863 ComponentType::WebhookEndpoint,
2864 StrVariant::empty(),
2865 ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
2866 )
2867 .unwrap(),
2868 forward_stdout: Some(StdOutputConfig::Stdout),
2869 forward_stderr: Some(StdOutputConfig::Stdout),
2870 env_vars: Arc::from([]),
2871 fuel: None,
2872 backtrace_persist: false,
2873 subscription_interruption: None,
2874 logs_store_min_level: None,
2875 allowed_hosts: Arc::from(vec![AllowedHostConfig {
2876 pattern: host_pattern,
2877 secret_env_mappings: Vec::new(),
2878 replace_in: hashbrown::HashSet::new(),
2879 }]),
2880 js_config: Some(WebhookEndpointJsConfig {
2881 source: source.to_string(),
2882 file_name: String::new(),
2883 resolved_imports: HashMap::new(),
2884 }),
2885 config_section_hint:
2886 crate::http_hooks::ConfigSectionHint::WebhookEndpointJs,
2887 },
2888 runnable_component,
2889 )
2890 .unwrap()
2891 .link(&engine, fn_registry.as_ref())
2892 .unwrap();
2893 let mut router = MethodAwareRouter::default();
2894 router.add(None, "", instance);
2895 router
2896 };
2897 let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
2898 .await
2899 .unwrap();
2900 let server_addr = tcp_listener.local_addr().unwrap();
2901 info!(
2902 "JS webhook with HTTP listening on port {}",
2903 server_addr.port()
2904 );
2905 let (server_termination_sender, server_termination_watcher) = watch::channel(());
2906 let (wh_server_state_sender, wh_server_state_watcher) =
2907 watch::channel(Arc::new(WebhookServerState {
2908 deployment_id: DEPLOYMENT_ID_DUMMY,
2909 router: Arc::new(router),
2910 fn_registry,
2911 }));
2912 let mut set = tokio::task::JoinSet::new();
2913 set.spawn(webhook_trigger::server(
2914 "test-js-http".to_string(),
2915 tcp_listener,
2916 engine,
2917 wh_server_state_watcher,
2918 db_forwarder_sender,
2919 db_pool,
2920 sim_clock.clone_box(),
2921 Arc::new(TokioSleep),
2922 None,
2923 server_termination_watcher,
2924 ));
2925 (
2926 set,
2927 server_addr,
2928 WatchGuard {
2929 server_termination_sender,
2930 wh_server_state_sender,
2931 },
2932 )
2933 }
2934
2935 #[tokio::test]
2936 async fn webhook_js_fetch_get() {
2937 use wiremock::{
2938 Mock, MockServer, ResponseTemplate,
2939 matchers::{method, path},
2940 };
2941 test_utils::set_up();
2942 let mock_server = MockServer::start().await;
2943 Mock::given(method("GET"))
2944 .and(path("/hello"))
2945 .respond_with(ResponseTemplate::new(200).set_body_string("fetch works"))
2946 .expect(1)
2947 .mount(&mock_server)
2948 .await;
2949
2950 let url = mock_server.uri();
2951 let js_source = format!(
2952 r#"
2953 export default async function handle(request) {{
2954 const resp = await fetch("{url}/hello");
2955 const text = await resp.text();
2956 return new Response(text);
2957 }}
2958 "#
2959 );
2960
2961 let allowed = format!("http://127.0.0.1:{}", mock_server.address().port());
2962 let (_server, server_addr, _termination_sender) =
2963 start_js_webhook_server_with_http(&js_source, &allowed).await;
2964 let resp = reqwest::get(format!("http://{server_addr}/"))
2965 .await
2966 .unwrap();
2967 let status = resp.status().as_u16();
2968 let body = resp.text().await.unwrap();
2969 assert_eq!((status, body.as_str()), (200, "fetch works"));
2970 }
2971
2972 #[tokio::test]
2973 async fn webhook_js_request_headers() {
2974 test_utils::set_up();
2975 let js_source = r#"
2979 export default function handle(request) {
2980 const value = request.headers.get("x-custom");
2981 console.log("header value:`" +value + "`");
2982 return Response.json(value !== null ? value.split(", ") : []);
2983 }
2984 "#;
2985
2986 let (_server, server_addr, _termination_sender) =
2987 start_js_webhook_server(js_source).await;
2988
2989 let client = reqwest::Client::new();
2991 let resp = client
2992 .get(format!("http://{server_addr}/"))
2993 .header("x-custom", "value1")
2994 .header("x-custom", "value2")
2995 .send()
2996 .await
2997 .unwrap();
2998
2999 let status = resp.status().as_u16();
3000 let body = resp.text().await.unwrap();
3001 assert_eq!(status, 200);
3002 let headers: Vec<String> = serde_json::from_str(&body).unwrap();
3003 assert_eq!(headers, vec!["value1", "value2"]);
3004 }
3005
3006 #[tokio::test]
3007 async fn webhook_js_fetch_proxy_headers() {
3008 use wiremock::{
3009 Mock, MockServer, ResponseTemplate,
3010 matchers::{header, method},
3011 };
3012 test_utils::set_up();
3013 let mock_server = MockServer::start().await;
3014 Mock::given(method("GET"))
3015 .and(header("x-forwarded", "proxy-value"))
3016 .respond_with(ResponseTemplate::new(200).set_body_string("proxied"))
3017 .expect(1)
3018 .mount(&mock_server)
3019 .await;
3020
3021 let url = mock_server.uri();
3022 let js_source = format!(
3023 r#"
3024 export default async function handle(request) {{
3025 console.log("Got headers", [...request.headers]);
3026 return fetch("{url}/", {{ headers: request.headers }});
3027 }}
3028 "#
3029 );
3030
3031 let allowed = format!("http://127.0.0.1:{}", mock_server.address().port());
3032 let (_server, server_addr, _termination_sender) =
3033 start_js_webhook_server_with_http(&js_source, &allowed).await;
3034
3035 let client = reqwest::Client::new();
3036 let resp = client
3037 .get(format!("http://{server_addr}/"))
3038 .header("x-forwarded", "proxy-value")
3039 .send()
3040 .await
3041 .unwrap();
3042
3043 let status = resp.status().as_u16();
3044 let body = resp.text().await.unwrap();
3045 assert_eq!((status, body.as_str()), (200, "proxied"));
3046 }
3047
3048 #[tokio::test]
3049 async fn webhook_js_env_var() {
3050 test_utils::set_up();
3051 let js_source = r#"
3052 export default function handle(request) {
3053 const jsSource = process.env["__OBELISK_JS_SOURCE__"];
3054 const missing = process.env["MISSING_VAR"];
3055 return Response.json({
3056 hasJsSource: jsSource !== undefined,
3057 missingIsUndefined: missing === undefined,
3058 });
3059 }
3060 "#;
3061 let (_server, server_addr, _termination_sender) =
3062 start_js_webhook_server(js_source).await;
3063 let resp = reqwest::get(format!("http://{server_addr}/"))
3064 .await
3065 .unwrap();
3066 assert_eq!(resp.status().as_u16(), 200);
3067 let body: serde_json::Value = resp.json().await.unwrap();
3068 assert_eq!(body["hasJsSource"], serde_json::json!(true));
3069 assert_eq!(body["missingIsUndefined"], serde_json::json!(true));
3070 }
3071
3072 #[tokio::test]
3073 async fn webhook_js_request_body_text() {
3074 test_utils::set_up();
3075 let js_source = r#"
3076 export default async function handle(request) {
3077 const text = await request.text();
3078 return new Response(text, { headers: { "content-type": "text/plain" } });
3079 }
3080 "#;
3081 let (_server, server_addr, _termination_sender) =
3082 start_js_webhook_server(js_source).await;
3083 let client = reqwest::Client::new();
3084 let resp = client
3085 .post(format!("http://{server_addr}/"))
3086 .body("hello from body")
3087 .send()
3088 .await
3089 .unwrap();
3090 assert_eq!(resp.status().as_u16(), 200);
3091 assert_eq!(resp.text().await.unwrap(), "hello from body");
3092 }
3093
3094 #[tokio::test]
3095 async fn webhook_js_request_body_json() {
3096 test_utils::set_up();
3097 let js_source = r"
3098 export default async function handle(request) {
3099 const data = await request.json();
3100 return Response.json({ received: data });
3101 }
3102 ";
3103 let (_server, server_addr, _termination_sender) =
3104 start_js_webhook_server(js_source).await;
3105 let client = reqwest::Client::new();
3106 let resp = client
3107 .post(format!("http://{server_addr}/"))
3108 .header("content-type", "application/json")
3109 .body(r#"{"name":"world","value":42}"#)
3110 .send()
3111 .await
3112 .unwrap();
3113 assert_eq!(resp.status().as_u16(), 200);
3114 let body: serde_json::Value = resp.json().await.unwrap();
3115 assert_eq!(body["received"]["name"], "world");
3116 assert_eq!(body["received"]["value"], 42);
3117 }
3118
3119 #[tokio::test]
3120 async fn webhook_js_request_body_form_data() {
3121 test_utils::set_up();
3122 let js_source = r"
3123 export default async function handle(request) {
3124 const form = await request.formData();
3125 return Response.json(form);
3126 }
3127 ";
3128 let (_server, server_addr, _termination_sender) =
3129 start_js_webhook_server(js_source).await;
3130 let client = reqwest::Client::new();
3131 let resp = client
3132 .post(format!("http://{server_addr}/"))
3133 .header("content-type", "application/x-www-form-urlencoded")
3134 .body("name=Alice&city=Wonderland")
3135 .send()
3136 .await
3137 .unwrap();
3138 assert_eq!(resp.status().as_u16(), 200);
3139 let body: serde_json::Value = resp.json().await.unwrap();
3140 assert_eq!(body["name"], "Alice");
3141 assert_eq!(body["city"], "Wonderland");
3142 }
3143
3144 #[tokio::test]
3145 async fn webhook_js_request_body_empty() {
3146 test_utils::set_up();
3147 let js_source = r"
3149 export default async function handle(request) {
3150 const text = await request.text();
3151 return new Response(JSON.stringify({ len: text.length }));
3152 }
3153 ";
3154 let (_server, server_addr, _termination_sender) =
3155 start_js_webhook_server(js_source).await;
3156 let resp = reqwest::get(format!("http://{server_addr}/"))
3157 .await
3158 .unwrap();
3159 assert_eq!(resp.status().as_u16(), 200);
3160 let body: serde_json::Value = resp.json().await.unwrap();
3161 assert_eq!(body["len"], 0);
3162 }
3163
3164 #[tokio::test]
3165 async fn webhook_js_generate_execution_id() {
3166 test_utils::set_up();
3167 let js_source = r#"
3168 export default function handle(request) {
3169 const id1 = obelisk.executionIdGenerate();
3170 const id2 = obelisk.executionIdGenerate();
3171 return Response.json({
3172 id1,
3173 id2,
3174 different: id1 !== id2,
3175 hasPrefix: id1.startsWith("E_"),
3176 });
3177 }
3178 "#;
3179 let (_server, server_addr, _termination_sender) =
3180 start_js_webhook_server(js_source).await;
3181 let resp = reqwest::get(format!("http://{server_addr}/"))
3182 .await
3183 .unwrap();
3184 assert_eq!(resp.status().as_u16(), 200);
3185 let body: serde_json::Value = resp.json().await.unwrap();
3186 assert_eq!(body["different"], serde_json::json!(true));
3187 assert_eq!(body["hasPrefix"], serde_json::json!(true));
3188 }
3189
3190 struct JsWebhookWithActivitiesHarness {
3192 #[expect(dead_code)]
3193 server_set: tokio::task::JoinSet<Result<(), WebhookServerError>>,
3194 server_addr: SocketAddr,
3195 activity_exec: executor::executor::ExecTask,
3196 sim_clock: SimClock,
3197 db_pool: Arc<dyn concepts::storage::DbPool>,
3198 #[expect(dead_code)]
3199 db_close: db_tests::DbPoolCloseableWrapper,
3200 #[expect(dead_code)]
3201 guard: WatchGuard,
3202 }
3203
3204 impl JsWebhookWithActivitiesHarness {
3205 async fn new(js_source: &str) -> Self {
3206 use crate::activity::activity_worker::test::compile_activity;
3207 use crate::activity::activity_worker::tests::new_activity_fibo;
3208 use concepts::time::TokioSleep;
3209 use executor::executor::LockingStrategy;
3210
3211 let sim_clock = SimClock::default();
3212 let (_guard, db_pool, db_close) = db_tests::Database::Sqlite.set_up().await;
3213
3214 let activity_exec = new_activity_fibo(
3216 db_pool.clone(),
3217 sim_clock.clone_box(),
3218 TokioSleep,
3219 LockingStrategy::ByComponentDigest,
3220 )
3221 .await;
3222
3223 let fn_registry = TestingFnRegistry::new_from_components(vec![
3225 compile_activity(
3226 test_programs_fibo_activity_builder::TEST_PROGRAMS_FIBO_ACTIVITY,
3227 )
3228 .await,
3229 ]);
3230
3231 let engine =
3232 Engines::get_webhook_engine(EngineConfig::on_demand_testing()).unwrap();
3233 let (db_forwarder_sender, _) = mpsc::channel(1);
3234 let wasm_file = webhook_js_runtime_builder::WEBHOOK_JS_RUNTIME;
3235
3236 let router = {
3237 let runnable_component =
3238 RunnableComponent::new(wasm_file, &engine, ComponentType::WebhookEndpoint)
3239 .unwrap();
3240 let instance = WebhookEndpointCompiled::new(
3241 WebhookEndpointConfig {
3242 component_id: ComponentId::new(
3243 ComponentType::WebhookEndpoint,
3244 StrVariant::empty(),
3245 ComponentDigest(calculate_sha256_file(wasm_file).await.unwrap().0),
3246 )
3247 .unwrap(),
3248 forward_stdout: Some(StdOutputConfig::Stdout),
3249 forward_stderr: Some(StdOutputConfig::Stdout),
3250 env_vars: Arc::from([]),
3251 fuel: None,
3252 backtrace_persist: false,
3253 subscription_interruption: None,
3254 logs_store_min_level: None,
3255 allowed_hosts: Arc::from([]),
3256 js_config: Some(WebhookEndpointJsConfig {
3257 source: js_source.to_string(),
3258 file_name: String::new(),
3259 resolved_imports: HashMap::new(),
3260 }),
3261 config_section_hint:
3262 crate::http_hooks::ConfigSectionHint::WebhookEndpointJs,
3263 },
3264 runnable_component,
3265 )
3266 .unwrap()
3267 .link(&engine, fn_registry.as_ref())
3268 .unwrap();
3269 let mut router = MethodAwareRouter::default();
3270 router.add(None, "", instance);
3271 router
3272 };
3273
3274 let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
3275 .await
3276 .unwrap();
3277 let server_addr = tcp_listener.local_addr().unwrap();
3278 info!(
3279 "JS webhook with activities listening on port {}",
3280 server_addr.port()
3281 );
3282 let (server_termination_sender, server_termination_watcher) = watch::channel(());
3283 let (wh_server_state_sender, wh_server_state_watcher) =
3284 watch::channel(Arc::new(WebhookServerState {
3285 deployment_id: DEPLOYMENT_ID_DUMMY,
3286 router: Arc::new(router),
3287 fn_registry,
3288 }));
3289 let mut server_set = tokio::task::JoinSet::new();
3290 server_set.spawn(webhook_trigger::server(
3291 "test-js-activities".to_string(),
3292 tcp_listener,
3293 engine,
3294 wh_server_state_watcher,
3295 db_forwarder_sender,
3296 db_pool.clone(),
3297 sim_clock.clone_box(),
3298 Arc::new(TokioSleep),
3299 None,
3300 server_termination_watcher,
3301 ));
3302
3303 Self {
3304 server_set,
3305 server_addr,
3306 activity_exec,
3307 sim_clock,
3308 db_pool,
3309 db_close,
3310 guard: WatchGuard {
3311 server_termination_sender,
3312 wh_server_state_sender,
3313 },
3314 }
3315 }
3316
3317 async fn tick_activity(&self) -> usize {
3318 use concepts::prefixed_ulid::RunId;
3319 self.activity_exec
3320 .tick_test_await(self.sim_clock.now(), RunId::generate())
3321 .await
3322 .len()
3323 }
3324 }
3325
3326 #[tokio::test]
3327 async fn webhook_js_call_activity() {
3328 test_utils::set_up();
3329 let js_source = r#"
3330 export default function handle(request) {
3331 // Call fibo(10) directly
3332 const result = obelisk.call("testing:fibo/fibo.fibo", [10]);
3333 return Response.json({ result });
3334 }
3335 "#;
3336
3337 let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3338
3339 let server_addr = harness.server_addr;
3341 let fetch_task = tokio::spawn(async move {
3342 reqwest::get(format!("http://{server_addr}/"))
3343 .await
3344 .unwrap()
3345 });
3346
3347 while harness.tick_activity().await == 0 {
3349 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3350 }
3351
3352 let resp = fetch_task.await.unwrap();
3354 assert_eq!(resp.status().as_u16(), 200);
3355 let body: serde_json::Value = resp.json().await.unwrap();
3356 assert_eq!(body["result"], serde_json::json!(55)); }
3358
3359 #[tokio::test]
3360 async fn webhook_js_schedule_activity() {
3361 use std::str::FromStr as _;
3362
3363 test_utils::set_up();
3364 let js_source = r#"
3365 export default function handle(request) {
3366 // Schedule fibo(10) for later execution
3367 const execId = obelisk.executionIdGenerate();
3368 obelisk.schedule(execId, "testing:fibo/fibo.fibo", [10], { seconds: 60 });
3369 return Response.json({ execId });
3370 }
3371 "#;
3372
3373 let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3374 let resp = reqwest::get(format!("http://{}/", harness.server_addr))
3375 .await
3376 .unwrap();
3377 assert_eq!(resp.status().as_u16(), 200);
3378 let body: serde_json::Value = resp.json().await.unwrap();
3379
3380 let exec_id_str = body["execId"].as_str().unwrap();
3382 assert!(
3383 exec_id_str.starts_with("E_"),
3384 "Expected execution ID prefix"
3385 );
3386
3387 let exec_id = concepts::ExecutionId::from_str(exec_id_str).unwrap();
3389 let conn = harness.db_pool.connection().await.unwrap();
3390 let create_req = conn.get_create_request(&exec_id).await.unwrap();
3391 assert_eq!(
3392 "testing:fibo/fibo.fibo",
3393 create_req.ffqn.to_string().as_str()
3394 );
3395 }
3396
3397 #[tokio::test]
3398 async fn webhook_js_get_status() {
3399 test_utils::set_up();
3400 let js_source = r#"
3401 export default function handle(request) {
3402 // Schedule for later, then check status
3403 const execId = obelisk.executionIdGenerate();
3404 obelisk.schedule(execId, "testing:fibo/fibo.fibo", [10], { seconds: 60 });
3405 const status = obelisk.getStatus(execId);
3406 return Response.json({ execId, executionStatus: status });
3407 }
3408 "#;
3409
3410 let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3411 let resp = reqwest::get(format!("http://{}/", harness.server_addr))
3412 .await
3413 .unwrap();
3414 assert_eq!(resp.status().as_u16(), 200);
3415 let body: serde_json::Value = resp.json().await.unwrap();
3416
3417 assert_eq!(
3419 body["executionStatus"]["status"],
3420 serde_json::json!("pendingAt")
3421 );
3422 }
3423
3424 #[tokio::test]
3425 async fn webhook_js_try_get_pending() {
3426 test_utils::set_up();
3427 let js_source = r#"
3428 export default function handle(request) {
3429 // Schedule now but don't wait for completion
3430 const execId = obelisk.executionIdGenerate();
3431 obelisk.schedule(execId, "testing:fibo/fibo.fibo", [10]);
3432 const result = obelisk.tryGet(execId);
3433 return Response.json({ pending: result === undefined });
3434 }
3435 "#;
3436
3437 let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3438 let resp = reqwest::get(format!("http://{}/", harness.server_addr))
3439 .await
3440 .unwrap();
3441 assert_eq!(resp.status().as_u16(), 200);
3442 let body: serde_json::Value = resp.json().await.unwrap();
3443
3444 assert_eq!(body["pending"], serde_json::json!(true));
3446 }
3447
3448 #[tokio::test]
3449 async fn webhook_js_call_with_error() {
3450 test_utils::set_up();
3451 let js_source = r#"
3453 export default function handle(request) {
3454 try {
3455 obelisk.call("testing:fibo/fibo.fibo", [50]);
3456 return Response.json({ result: "unexpected success" });
3457 } catch (e) {
3458 return Response.json({ error: e.message });
3459 }
3460 }
3461 "#;
3462
3463 let harness = JsWebhookWithActivitiesHarness::new(js_source).await;
3464
3465 let server_addr = harness.server_addr;
3466 let fetch_task = tokio::spawn(async move {
3467 reqwest::get(format!("http://{server_addr}/"))
3468 .await
3469 .unwrap()
3470 });
3471
3472 while harness.tick_activity().await == 0 {
3474 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3475 }
3476
3477 let resp = fetch_task.await.unwrap();
3478 assert_eq!(resp.status().as_u16(), 200);
3479 let body: serde_json::Value = resp.json().await.unwrap();
3480 assert!(body["error"].is_string());
3482 }
3483 }
3484
3485 #[test]
3486 fn routes() {
3487 let mut router = MethodAwareRouter::default();
3488 router.add(Some(Method::GET), "/foo", 1);
3489 router.add(Some(Method::GET), "/foo/*", 2);
3490 router.add(None, "/foo", 3);
3491 router.add(None, "/*", 4);
3492 router.add(None, "/", 5);
3493 router.add(Some(Method::GET), "/path/:param1/:param2", 6);
3494
3495 assert_eq!(
3496 1,
3497 **router
3498 .find(&Method::GET, &Uri::from_static("/foo"))
3499 .unwrap()
3500 .handler()
3501 );
3502 assert_eq!(
3503 2,
3504 **router
3505 .find(&Method::GET, &Uri::from_static("/foo/"))
3506 .unwrap()
3507 .handler()
3508 );
3509 assert_eq!(
3510 2,
3511 **router
3512 .find(&Method::GET, &Uri::from_static("/foo/foo/"))
3513 .unwrap()
3514 .handler()
3515 );
3516 assert_eq!(
3517 2,
3518 **router
3519 .find(&Method::GET, &Uri::from_static("/foo/foo/bar"))
3520 .unwrap()
3521 .handler()
3522 );
3523 assert_eq!(
3524 3,
3525 **router
3526 .find(&Method::POST, &Uri::from_static("/foo"))
3527 .unwrap()
3528 .handler()
3529 );
3530 assert_eq!(
3531 5,
3532 **router
3533 .find(&Method::GET, &Uri::from_static("/"))
3534 .unwrap()
3535 .handler()
3536 );
3537
3538 let found = router
3539 .find(&Method::GET, &Uri::from_static("/path/p1/p2"))
3540 .unwrap();
3541 assert_eq!(6, **found.handler());
3542 assert_eq!(
3543 hashbrown::HashMap::from([("param1", "p1"), ("param2", "p2")]),
3544 found
3545 .params()
3546 .into_iter()
3547 .collect::<hashbrown::HashMap<_, _>>()
3548 );
3549 let found = router
3550 .find(&Method::GET, &Uri::from_static("/path/p1/p2/p3"))
3551 .unwrap();
3552 assert_eq!(4, **found.handler());
3553 }
3554
3555 #[test]
3556 fn routes_empty_fallback() {
3557 let mut router = MethodAwareRouter::default();
3558 router.add(Some(Method::GET), "/foo", 1);
3559 router.add(None, "", 9);
3560
3561 assert_eq!(
3562 1,
3563 **router
3564 .find(&Method::GET, &Uri::from_static("/foo"))
3565 .unwrap()
3566 .handler()
3567 );
3568 assert_eq!(
3569 9,
3570 **router
3571 .find(&Method::GET, &Uri::from_static("/"))
3572 .unwrap()
3573 .handler()
3574 );
3575 assert_eq!(
3576 9,
3577 **router
3578 .find(&Method::GET, &Uri::from_static("/x"))
3579 .unwrap()
3580 .handler()
3581 );
3582 assert_eq!(
3583 9,
3584 **router
3585 .find(&Method::GET, &Uri::from_static("/x/"))
3586 .unwrap()
3587 .handler()
3588 );
3589 }
3590}