viceroy_lib/
execute.rs

1//! Guest code execution.
2
3use {
4    crate::{
5        acl::Acls,
6        adapt,
7        body::Body,
8        body_tee::tee,
9        cache::Cache,
10        component as compute,
11        config::{
12            Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation,
13            UnknownImportBehavior,
14        },
15        downstream::{prepare_request, DownstreamMetadata, DownstreamRequest, DownstreamResponse},
16        error::{ExecutionError, NonHttpResponse},
17        linking::{create_store, link_host_functions, ComponentCtx, WasmCtx},
18        object_store::ObjectStores,
19        pushpin::{proxy_through_pushpin, PushpinRedirectRequestInfo},
20        secret_store::SecretStores,
21        session::Session,
22        shielding_site::ShieldingSites,
23        upstream::TlsConfig,
24        Error,
25    },
26    futures::{
27        task::{Context, Poll},
28        Future,
29    },
30    http::StatusCode,
31    hyper::{Request, Response},
32    pin_project::pin_project,
33    std::{
34        collections::HashSet,
35        fmt, fs,
36        io::Write,
37        net::{Ipv4Addr, SocketAddr},
38        path::{Path, PathBuf},
39        pin::Pin,
40        sync::{
41            atomic::{AtomicBool, AtomicU64, Ordering},
42            Arc, Mutex,
43        },
44        thread::{self, JoinHandle},
45        time::{Duration, Instant, SystemTime},
46    },
47    tokio::sync::oneshot::{self, Sender},
48    tokio::sync::Mutex as AsyncMutex,
49    tracing::{error, event, info, info_span, warn, Instrument, Level},
50    wasmtime::{
51        component::{self, Component},
52        Engine, GuestProfiler, InstancePre, Linker, Module, ProfilingStrategy,
53    },
54    wasmtime_wasi::I32Exit,
55};
56
57pub const DEFAULT_EPOCH_INTERRUPTION_PERIOD: Duration = Duration::from_micros(50);
58
59const NEXT_REQ_PENDING_MAX: usize = 5;
60const REGION_NONE: &str = "none";
61
62enum Instance {
63    Module(Module, InstancePre<WasmCtx>),
64    Component(compute::bindings::AdapterServicePre<ComponentCtx>),
65}
66
67impl Instance {
68    fn unwrap_module(&self) -> (&Module, &InstancePre<WasmCtx>) {
69        match self {
70            Instance::Module(m, i) => (m, i),
71            Instance::Component(_) => panic!("unwrap_module called on a component"),
72        }
73    }
74}
75
76#[derive(Clone)]
77pub struct GuestProfileConfig {
78    /// Path to write profiling results from the guest. In serve mode,
79    /// this must refer to a directory, while in run mode it names
80    /// a file.
81    pub path: PathBuf,
82    /// Period at which the guest should be profiled.
83    pub sample_period: Duration,
84}
85
86pub struct NextRequest(Option<(DownstreamRequest, Arc<ExecuteCtx>)>);
87
88impl NextRequest {
89    pub fn into_request(mut self) -> Option<DownstreamRequest> {
90        self.0.take().map(|(r, _)| r)
91    }
92}
93
94impl fmt::Debug for NextRequest {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        let debug = self.0.as_ref().map(|(r, _)| r);
97        f.debug_tuple("NextRequest")
98            .field(&debug)
99            .finish_non_exhaustive()
100    }
101}
102
103impl Drop for NextRequest {
104    fn drop(&mut self) {
105        let Some((req, ctx)) = self.0.take() else {
106            return;
107        };
108
109        ctx.retry_request(req);
110    }
111}
112
113/// Execution context used by a [`ViceroyService`](struct.ViceroyService.html).
114///
115/// This is all of the state needed to instantiate a module, in order to respond to an HTTP
116/// request. Note that it is very important that `ExecuteCtx` be cheaply clonable, as it is cloned
117/// every time that a viceroy service handles an incoming connection.
118pub struct ExecuteCtx {
119    /// A reference to the global context for Wasm compilation.
120    engine: Engine,
121    /// An almost-linked Instance: each import function is linked, just needs a Store
122    instance_pre: Arc<Instance>,
123    /// The acls for this execution.
124    acls: Acls,
125    /// The backends for this execution.
126    backends: Backends,
127    /// The device detection mappings for this execution.
128    device_detection: DeviceDetection,
129    /// The geolocation mappings for this execution.
130    geolocation: Geolocation,
131    /// Preloaded TLS certificates and configuration
132    tls_config: TlsConfig,
133    /// The dictionaries for this execution.
134    dictionaries: Dictionaries,
135    /// Path to the config, defaults to None
136    config_path: Option<PathBuf>,
137    /// Where to direct logging endpoint messages, defaults to stdout
138    capture_logs: Arc<Mutex<dyn Write + Send>>,
139    /// Whether to treat stdout as a logging endpoint
140    log_stdout: bool,
141    /// Whether to treat stderr as a logging endpoint
142    log_stderr: bool,
143    /// The local Pushpin proxy port
144    local_pushpin_proxy_port: Option<u16>,
145    /// The ID to assign the next incoming request
146    next_req_id: Arc<AtomicU64>,
147    /// The ObjectStore associated with this instance of Viceroy
148    object_store: ObjectStores,
149    /// The secret stores for this execution.
150    secret_stores: SecretStores,
151    /// The shielding sites for this execution.
152    shielding_sites: ShieldingSites,
153    /// The cache for this service.
154    cache: Arc<Cache>,
155    /// Senders waiting for new requests for reusable sessions.
156    pending_reuse: Arc<AsyncMutex<Vec<Sender<NextRequest>>>>,
157    epoch_increment_thread: Option<JoinHandle<()>>,
158    // `Arc` so that it can be tracked both by this context and `epoch_increment_thread`.
159    epoch_increment_stop: Arc<AtomicBool>,
160    /// Configuration for guest profiling if enabled
161    guest_profile_config: Option<Arc<GuestProfileConfig>>,
162}
163
164impl ExecuteCtx {
165    /// Build a new execution context, given the path to a module and a set of experimental wasi modules.
166    pub fn build(
167        module_path: impl AsRef<Path>,
168        profiling_strategy: ProfilingStrategy,
169        wasi_modules: HashSet<ExperimentalModule>,
170        guest_profile_config: Option<GuestProfileConfig>,
171        unknown_import_behavior: UnknownImportBehavior,
172        adapt_components: bool,
173    ) -> Result<ExecuteCtxBuilder, Error> {
174        let input = fs::read(&module_path)?;
175
176        let is_wat = module_path
177            .as_ref()
178            .extension()
179            .map(|str| str == "wat")
180            .unwrap_or(false);
181
182        // When the input wasn't a component, but we're automatically adapting,
183        // apply the component adapter.
184        let is_component = adapt::is_component(&input);
185        let (is_wat, is_component, input) = if !is_component && adapt_components {
186            let input = if is_wat {
187                let text = String::from_utf8(input).map_err(|_| {
188                    anyhow::anyhow!("Failed to parse {}", module_path.as_ref().display())
189                })?;
190                adapt::adapt_wat(&text)?
191            } else {
192                adapt::adapt_bytes(&input)?
193            };
194
195            (false, true, input)
196        } else {
197            (is_wat, is_component, input)
198        };
199
200        let config = &configure_wasmtime(is_component, profiling_strategy);
201        let engine = Engine::new(config)?;
202        let instance_pre = if is_component {
203            warn!(
204                "
205
206   +------------------------------------------------------------------------+
207   |                                                                        |
208   | Wasm Component support in viceroy is in active development, and is not |
209   |                    supported for general consumption.                  |
210   |                                                                        |
211   +------------------------------------------------------------------------+
212
213            "
214            );
215
216            // If logging isn't enabled, print the notice to stderr.
217            if !tracing::enabled!(Level::WARN) {
218                eprintln!(
219                    "
220
221   +------------------------------------------------------------------------+
222   |                                                                        |
223   | Wasm Component support in viceroy is in active development, and is not |
224   |                    supported for general consumption.                  |
225   |                                                                        |
226   +------------------------------------------------------------------------+
227
228            "
229                );
230            }
231
232            let mut linker: component::Linker<ComponentCtx> = component::Linker::new(&engine);
233            compute::link_host_functions(&mut linker)?;
234            let component = if is_wat {
235                Component::from_file(&engine, &module_path)?
236            } else {
237                Component::from_binary(&engine, &input)?
238            };
239
240            match unknown_import_behavior {
241                UnknownImportBehavior::LinkError => (),
242                UnknownImportBehavior::Trap => {
243                    linker.define_unknown_imports_as_traps(&component)?
244                }
245            }
246
247            let instance_pre = linker.instantiate_pre(&component)?;
248            Instance::Component(compute::bindings::AdapterServicePre::new(instance_pre)?)
249        } else {
250            let mut linker = Linker::new(&engine);
251            link_host_functions(&mut linker, &wasi_modules)?;
252            let module = if is_wat {
253                Module::from_file(&engine, &module_path)?
254            } else {
255                Module::from_binary(&engine, &input)?
256            };
257
258            match unknown_import_behavior {
259                UnknownImportBehavior::LinkError => (),
260                UnknownImportBehavior::Trap => linker.define_unknown_imports_as_traps(&module)?,
261            }
262
263            let instance_pre = linker.instantiate_pre(&module)?;
264            Instance::Module(module, instance_pre)
265        };
266
267        // Create the epoch-increment thread. Note that the period for epoch
268        // interruptions is driven by the guest profiling sample period if
269        // provided as guest stack sampling is done from the epoch
270        // interruption callback.
271
272        let epoch_increment_stop = Arc::new(AtomicBool::new(false));
273        let engine_clone = engine.clone();
274        let epoch_increment_stop_clone = epoch_increment_stop.clone();
275        let sample_period = guest_profile_config
276            .as_ref()
277            .map(|c| c.sample_period)
278            .unwrap_or(DEFAULT_EPOCH_INTERRUPTION_PERIOD);
279        let epoch_increment_thread = Some(thread::spawn(move || {
280            while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
281                thread::sleep(sample_period);
282                engine_clone.increment_epoch();
283            }
284        }));
285
286        let inner = Self {
287            engine,
288            instance_pre: Arc::new(instance_pre),
289            acls: Acls::new(),
290            backends: Backends::default(),
291            device_detection: DeviceDetection::default(),
292            geolocation: Geolocation::default(),
293            tls_config: TlsConfig::new()?,
294            dictionaries: Dictionaries::default(),
295            config_path: None,
296            capture_logs: Arc::new(Mutex::new(std::io::stdout())),
297            log_stdout: false,
298            log_stderr: false,
299            local_pushpin_proxy_port: None,
300            next_req_id: Arc::new(AtomicU64::new(0)),
301            object_store: ObjectStores::new(),
302            secret_stores: SecretStores::new(),
303            shielding_sites: ShieldingSites::new(),
304            epoch_increment_thread,
305            epoch_increment_stop,
306            guest_profile_config: guest_profile_config.map(|c| Arc::new(c)),
307            cache: Arc::new(Cache::default()),
308            pending_reuse: Arc::new(AsyncMutex::new(vec![])),
309        };
310
311        Ok(ExecuteCtxBuilder { inner })
312    }
313
314    /// Create a new execution context, given the path to a module and a set of experimental wasi modules.
315    pub fn new(
316        module_path: impl AsRef<Path>,
317        profiling_strategy: ProfilingStrategy,
318        wasi_modules: HashSet<ExperimentalModule>,
319        guest_profile_config: Option<GuestProfileConfig>,
320        unknown_import_behavior: UnknownImportBehavior,
321        adapt_components: bool,
322    ) -> Result<Arc<Self>, Error> {
323        ExecuteCtx::build(
324            module_path,
325            profiling_strategy,
326            wasi_modules,
327            guest_profile_config,
328            unknown_import_behavior,
329            adapt_components,
330        )?
331        .finish()
332    }
333
334    /// Get the engine for this execution context.
335    pub fn engine(&self) -> &Engine {
336        &self.engine
337    }
338
339    /// Get the acls for this execution context.
340    pub fn acls(&self) -> &Acls {
341        &self.acls
342    }
343
344    /// Get the backends for this execution context.
345    pub fn backends(&self) -> &Backends {
346        &self.backends
347    }
348
349    /// Get the device detection mappings for this execution context.
350    pub fn device_detection(&self) -> &DeviceDetection {
351        &self.device_detection
352    }
353
354    /// Get the geolocation mappings for this execution context.
355    pub fn geolocation(&self) -> &Geolocation {
356        &self.geolocation
357    }
358
359    /// Get the dictionaries for this execution context.
360    pub fn dictionaries(&self) -> &Dictionaries {
361        &self.dictionaries
362    }
363
364    /// Where to direct logging endpoint messages. Defaults to stdout.
365    pub fn capture_logs(&self) -> Arc<Mutex<dyn Write + Send>> {
366        self.capture_logs.clone()
367    }
368
369    /// Whether to treat stdout as a logging endpoint.
370    pub fn log_stdout(&self) -> bool {
371        self.log_stdout
372    }
373
374    /// Whether to treat stderr as a logging endpoint.
375    pub fn log_stderr(&self) -> bool {
376        self.log_stderr
377    }
378
379    /// Gets the TLS configuration
380    pub fn tls_config(&self) -> &TlsConfig {
381        &self.tls_config
382    }
383
384    async fn maybe_receive_response(
385        receiver: oneshot::Receiver<DownstreamResponse>,
386    ) -> Option<(Response<Body>, Option<anyhow::Error>)> {
387        match receiver.await.ok()? {
388            DownstreamResponse::Http(resp) => Some((resp, None)),
389            DownstreamResponse::RedirectToPushpin(info) => Some((
390                Response::new(Body::empty()),
391                Some(NonHttpResponse::PushpinRedirect(info).into()),
392            )),
393        }
394    }
395
396    /// Asynchronously handle a request.
397    ///
398    /// This method fully instantiates the wasm module housed within the `ExecuteCtx`,
399    /// including running the wasm start function. It then proceeds to execute the
400    /// instantiated module's WASI entry point, running to completion. If execution
401    /// results in an error, a response is still produced, but with a 500 status code.
402    ///
403    /// Build time: Before you build or test your code, we recommend to set the release flag
404    /// e.g. `cargo test --release` otherwise the execution will be very slow. This has to do
405    /// with the Cranelift compiler, which is extremely slow when compiled in debug mode.
406    ///
407    /// # Example
408    ///
409    /// ```no_run
410    /// # use std::collections::HashSet;
411    /// use hyper::{Body, http::Request};
412    /// # use viceroy_lib::{Error, ExecuteCtx, ProfilingStrategy, ViceroyService};
413    /// # async fn f() -> Result<(), Error> {
414    /// # let req = Request::new(Body::from(""));
415    /// let adapt_core_wasm = false;
416    /// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new(), None, Default::default(), adapt_core_wasm)?;
417    /// let local = "127.0.0.1:80".parse().unwrap();
418    /// let remote = "127.0.0.1:0".parse().unwrap();
419    /// let resp = ctx.handle_request(req, local, remote).await?;
420    /// # Ok(())
421    /// # }
422    /// ```
423    pub async fn handle_request(
424        self: Arc<Self>,
425        mut incoming_req: Request<hyper::Body>,
426        local: SocketAddr,
427        remote: SocketAddr,
428    ) -> Result<(Response<Body>, Option<anyhow::Error>), Error> {
429        let orig_req_on_upgrade = hyper::upgrade::on(&mut incoming_req);
430        let (incoming_req_parts, incoming_req_body) = incoming_req.into_parts();
431        let local_pushpin_proxy_port = self.local_pushpin_proxy_port;
432
433        let (body_for_wasm, orig_body_tee) = tee(incoming_req_body).await;
434        let orig_request_info_for_pushpin =
435            PushpinRedirectRequestInfo::from_parts(&incoming_req_parts);
436
437        let original_headers = incoming_req_parts.headers.clone();
438        let req = prepare_request(Request::from_parts(incoming_req_parts, body_for_wasm))?;
439
440        let req_id = self
441            .next_req_id
442            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
443
444        let metadata = DownstreamMetadata {
445            req_id,
446            server_addr: local,
447            client_addr: remote,
448            compliance_region: String::from(REGION_NONE),
449            original_headers,
450        };
451
452        let (resp, mut err) = self.reuse_or_spawn_guest(req, metadata).await;
453
454        let span = info_span!("request", id = req_id);
455        let _span = span.enter();
456
457        info!("response status: {:?}", resp.status());
458
459        if let Some(e) = err {
460            match e.downcast::<NonHttpResponse>() {
461                Ok(NonHttpResponse::PushpinRedirect(redirect_info)) => {
462                    let backend_name = redirect_info.backend_name;
463                    let redirect_request_info = redirect_info.request_info;
464                    info!("Pushpin redirect signaled to backend '{}'", backend_name);
465
466                    let local_pushpin_proxy_port = match local_pushpin_proxy_port {
467                        None => {
468                            error!("Pushpin redirect signaled, but Pushpin mode not enabled.");
469                            let err = anyhow::anyhow!(
470                                "Pushpin redirect signaled, but Pushpin mode not enabled."
471                            );
472                            let resp = Response::builder()
473                                .status(StatusCode::INTERNAL_SERVER_ERROR)
474                                .body(Body::from(hyper::Body::from(err.to_string())))?;
475                            return Ok((resp, Some(err)));
476                        }
477                        Some(port) => port,
478                    };
479
480                    let proxy_resp = proxy_through_pushpin(
481                        SocketAddr::new(Ipv4Addr::LOCALHOST.into(), local_pushpin_proxy_port),
482                        backend_name,
483                        redirect_request_info,
484                        orig_request_info_for_pushpin,
485                        orig_body_tee,
486                        orig_req_on_upgrade,
487                    )
488                    .await;
489
490                    let (p, hyper_body) = proxy_resp.into_parts();
491                    return Ok((Response::from_parts(p, Body::from(hyper_body)), None));
492                }
493                Err(e) => {
494                    err = Some(e);
495                }
496            }
497        }
498
499        Ok((resp, err))
500    }
501
502    /// Spawn a new guest to process a request whose processing was never attempted by
503    /// a reused session.
504    pub(crate) fn retry_request(self: Arc<Self>, mut downstream: DownstreamRequest) {
505        if downstream.sender.is_closed() {
506            return;
507        }
508
509        tokio::task::spawn(async move {
510            let (sender, receiver) = oneshot::channel();
511            let original = std::mem::replace(&mut downstream.sender, sender);
512            let (resp, err) = self.spawn_guest(downstream, receiver).await;
513            let resp = guest_result_to_response(resp, err);
514            let _ = original.send(DownstreamResponse::Http(resp));
515        });
516    }
517
518    pub async fn handle_request_with_runtime_error(
519        self: Arc<Self>,
520        incoming_req: Request<hyper::Body>,
521        local: SocketAddr,
522        remote: SocketAddr,
523    ) -> Result<Response<Body>, Error> {
524        let result = self.handle_request(incoming_req, local, remote).await?;
525        let resp = guest_result_to_response(result.0, result.1);
526
527        Ok(resp)
528    }
529
530    async fn reuse_or_spawn_guest(
531        self: Arc<Self>,
532        req: Request<Body>,
533        metadata: DownstreamMetadata,
534    ) -> (Response<Body>, Option<anyhow::Error>) {
535        let (sender, receiver) = oneshot::channel();
536        let downstream = DownstreamRequest {
537            req,
538            sender,
539            metadata,
540        };
541
542        let mut next_req = NextRequest(Some((downstream, self.clone())));
543        let mut reusable = self.pending_reuse.lock().await;
544
545        while let Some(pending) = reusable.pop() {
546            match pending.send(next_req) {
547                Ok(()) => {
548                    // Drop lock and wait for the guest to process our request.
549                    drop(reusable);
550
551                    if let Some(response) = Self::maybe_receive_response(receiver).await {
552                        return response;
553                    }
554                    return (Response::default(), None);
555                }
556                Err(nr) => next_req = nr,
557            }
558        }
559
560        drop(reusable);
561
562        let downstream = next_req
563            .into_request()
564            .expect("request should still be unprocessed");
565        self.spawn_guest(downstream, receiver).await
566    }
567
568    async fn spawn_guest(
569        self: Arc<Self>,
570        downstream: DownstreamRequest,
571        receiver: oneshot::Receiver<DownstreamResponse>,
572    ) -> (Response<Body>, Option<anyhow::Error>) {
573        let active_cpu_time_us = Arc::new(AtomicU64::new(0));
574
575        // Spawn a separate task to run the guest code. That allows _this_ method to return a response early
576        // if the guest sends one, while the guest continues to run afterward within its task.
577        let req_id = downstream.metadata.req_id;
578        let guest_handle = tokio::task::spawn(CpuTimeTracking::new(
579            active_cpu_time_us.clone(),
580            self.run_guest(downstream, active_cpu_time_us)
581                .instrument(info_span!("request", id = req_id)),
582        ));
583
584        if let Some(response) = Self::maybe_receive_response(receiver).await {
585            return response;
586        }
587
588        match guest_handle
589            .await
590            .expect("guest worker finished without panicking")
591        {
592            Ok(_) => (Response::new(Body::empty()), None),
593            Err(ExecutionError::WasmTrap(e)) => {
594                event!(
595                    Level::ERROR,
596                    "There was an error handling the request {}",
597                    e.to_string()
598                );
599                (anyhow_response(&e), Some(e))
600            }
601            Err(e) => panic!("failed to run guest: {}", e),
602        }
603    }
604
605    async fn run_guest(
606        self: Arc<Self>,
607        downstream: DownstreamRequest,
608        active_cpu_time_us: Arc<AtomicU64>,
609    ) -> Result<(), ExecutionError> {
610        info!(
611            "handling request {} {}",
612            downstream.req.method(),
613            downstream.req.uri()
614        );
615        let start_timestamp = Instant::now();
616        let req_id = downstream.metadata.req_id;
617        let session = Session::new(downstream, active_cpu_time_us, self.clone());
618
619        let guest_profile_path = self.guest_profile_config.as_deref().map(|pcfg| {
620            let now = SystemTime::now()
621                .duration_since(SystemTime::UNIX_EPOCH)
622                .unwrap()
623                .as_secs();
624            pcfg.path.join(format!("{}-{}.json", now, req_id))
625        });
626
627        match self.instance_pre.as_ref() {
628            Instance::Component(instance_pre) => {
629                if self.guest_profile_config.is_some() {
630                    warn!("Components do not currently support the guest profiler");
631                }
632
633                let req = session.downstream_request();
634                let body = session.downstream_request_body();
635
636                let mut store = ComponentCtx::create_store(&self, session, None, |ctx| {
637                    ctx.arg("compute-app");
638                })
639                .map_err(ExecutionError::Context)?;
640
641                let compute = instance_pre
642                    .instantiate_async(&mut store)
643                    .await
644                    .map_err(ExecutionError::Instantiation)?;
645
646                let result = compute
647                    .fastly_compute_http_incoming()
648                    .call_handle(&mut store, req.into(), body.into())
649                    .await;
650
651                let outcome = match result {
652                    Ok(Ok(())) => Ok(()),
653
654                    Ok(Err(())) => {
655                        event!(Level::ERROR, "WebAssembly exited with an error");
656                        Err(ExecutionError::WasmTrap(anyhow::Error::msg("failed")))
657                    }
658
659                    Err(e) => {
660                        if let Some(exit) = e.downcast_ref::<I32Exit>() {
661                            if exit.0 == 0 {
662                                Ok(())
663                            } else {
664                                event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
665                                Err(ExecutionError::WasmTrap(e))
666                            }
667                        } else {
668                            event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
669                            Err(ExecutionError::WasmTrap(e))
670                        }
671                    }
672                };
673
674                // Ensure the downstream response channel is closed, whether or not a response was
675                // sent during execution.
676                let resp = outcome
677                    .as_ref()
678                    .err()
679                    .map(exec_err_to_response)
680                    .unwrap_or_default();
681                store
682                    .data_mut()
683                    .session
684                    .close_downstream_response_sender(resp);
685
686                let request_duration = Instant::now().duration_since(start_timestamp);
687
688                info!(
689                    "guest completed using {} of WebAssembly heap",
690                    bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64),
691                );
692
693                info!("guest completed in {:.0?}", request_duration);
694
695                outcome
696            }
697
698            Instance::Module(module, instance_pre) => {
699                let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
700                    let program_name = "main";
701                    GuestProfiler::new(
702                        program_name,
703                        pcfg.sample_period,
704                        vec![(program_name.to_string(), module.clone())],
705                    )
706                });
707
708                // We currently have to postpone linking and instantiation to the guest task
709                // due to wasmtime limitations, in particular the fact that `Instance` is not `Send`.
710                // However, the fact that the module itself is created within `ExecuteCtx::new`
711                // means that the heavy lifting happens only once.
712                let mut store = create_store(&self, session, profiler, |ctx| {
713                    ctx.arg("compute-app");
714                })
715                .map_err(ExecutionError::Context)?;
716
717                let instance = instance_pre
718                    .instantiate_async(&mut store)
719                    .await
720                    .map_err(ExecutionError::Instantiation)?;
721
722                // Pull out the `_start` function, which by convention with WASI is the main entry point for
723                // an application.
724                let main_func = instance
725                    .get_typed_func::<(), ()>(&mut store, "_start")
726                    .map_err(ExecutionError::Typechecking)?;
727
728                // Invoke the entrypoint function, which may or may not send a downstream response.
729                let outcome = match main_func.call_async(&mut store, ()).await {
730                    Ok(_) => Ok(()),
731                    Err(e) => {
732                        if let Some(exit) = e.downcast_ref::<I32Exit>() {
733                            if exit.0 == 0 {
734                                Ok(())
735                            } else {
736                                event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
737                                Err(ExecutionError::WasmTrap(e))
738                            }
739                        } else {
740                            event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
741                            Err(ExecutionError::WasmTrap(e))
742                        }
743                    }
744                };
745
746                // If we collected a profile, write it to the file
747                write_profile(&mut store, guest_profile_path.as_ref());
748
749                // Ensure the downstream response channel is closed, whether or not a response was
750                // sent during execution.
751                let resp = outcome
752                    .as_ref()
753                    .err()
754                    .map(exec_err_to_response)
755                    .unwrap_or_default();
756                store.data_mut().close_downstream_response_sender(resp);
757
758                let request_duration = Instant::now().duration_since(start_timestamp);
759
760                info!(
761                    "request completed using {} of WebAssembly heap",
762                    bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64)
763                );
764
765                info!("request completed in {:.0?}", request_duration);
766
767                outcome
768            }
769        }
770    }
771
772    pub async fn run_main(
773        self: Arc<Self>,
774        program_name: &str,
775        args: &[String],
776    ) -> Result<(), anyhow::Error> {
777        // placeholders for request, result sender channel, and remote IP
778        let req = Request::get("http://example.com/").body(Body::empty())?;
779        let metadata = DownstreamMetadata {
780            req_id: 0,
781            server_addr: (Ipv4Addr::LOCALHOST, 80).into(),
782            client_addr: (Ipv4Addr::LOCALHOST, 0).into(),
783            compliance_region: String::from(REGION_NONE),
784            original_headers: Default::default(),
785        };
786        let (sender, receiver) = oneshot::channel();
787        let downstream = DownstreamRequest {
788            req,
789            sender,
790            metadata,
791        };
792        let active_cpu_time_us = Arc::new(AtomicU64::new(0));
793
794        let session = Session::new(downstream, active_cpu_time_us.clone(), self.clone());
795
796        if let Instance::Component(_) = self.instance_pre.as_ref() {
797            panic!("components not currently supported with `run`");
798        }
799
800        let (module, instance_pre) = self.instance_pre.unwrap_module();
801
802        let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
803            GuestProfiler::new(
804                program_name,
805                pcfg.sample_period,
806                vec![(program_name.to_string(), module.clone())],
807            )
808        });
809
810        let mut store = create_store(&self, session, profiler, |builder| {
811            builder.arg(program_name);
812            for arg in args {
813                builder.arg(arg);
814            }
815        })
816        .map_err(ExecutionError::Context)?;
817
818        let instance = instance_pre
819            .instantiate_async(&mut store)
820            .await
821            .map_err(ExecutionError::Instantiation)?;
822
823        // Pull out the `_start` function, which by convention with WASI is the main entry point for
824        // an application.
825        let main_func = instance
826            .get_typed_func::<(), ()>(&mut store, "_start")
827            .map_err(ExecutionError::Typechecking)?;
828
829        // Invoke the entrypoint function and collect its exit code
830        let result =
831            CpuTimeTracking::new(active_cpu_time_us, main_func.call_async(&mut store, ())).await;
832
833        // If we collected a profile, write it to the file
834        write_profile(
835            &mut store,
836            self.guest_profile_config.as_deref().map(|cfg| &cfg.path),
837        );
838
839        // Ensure the downstream response channel is closed, whether or not a response was
840        // sent during execution.
841        store
842            .data_mut()
843            .close_downstream_response_sender(Response::default());
844
845        // We don't do anything with any response on the receiver, but
846        // it's important to keep it alive until after the program has
847        // finished.
848        drop(receiver);
849
850        result
851    }
852
853    pub fn cache(&self) -> &Arc<Cache> {
854        &self.cache
855    }
856
857    pub fn config_path(&self) -> Option<&Path> {
858        self.config_path.as_deref()
859    }
860
861    pub fn object_store(&self) -> &ObjectStores {
862        &self.object_store
863    }
864
865    pub fn secret_stores(&self) -> &SecretStores {
866        &self.secret_stores
867    }
868
869    pub fn shielding_sites(&self) -> &ShieldingSites {
870        &self.shielding_sites
871    }
872
873    pub async fn register_pending_downstream(&self) -> Option<oneshot::Receiver<NextRequest>> {
874        let mut pending = self.pending_reuse.lock().await;
875
876        if pending.len() >= NEXT_REQ_PENDING_MAX {
877            return None;
878        }
879
880        let (tx, rx) = oneshot::channel();
881        pending.push(tx);
882
883        Some(rx)
884    }
885}
886
887pub struct ExecuteCtxBuilder {
888    inner: ExecuteCtx,
889}
890
891impl ExecuteCtxBuilder {
892    pub fn finish(self) -> Result<Arc<ExecuteCtx>, Error> {
893        Ok(Arc::new(self.inner))
894    }
895
896    /// Set the acls for this execution context.
897    pub fn with_acls(mut self, acls: Acls) -> Self {
898        self.inner.acls = acls;
899        self
900    }
901
902    /// Set the backends for this execution context.
903    pub fn with_backends(mut self, backends: Backends) -> Self {
904        self.inner.backends = backends;
905        self
906    }
907
908    /// Set the device detection mappings for this execution context.
909    pub fn with_device_detection(mut self, device_detection: DeviceDetection) -> Self {
910        self.inner.device_detection = device_detection;
911        self
912    }
913
914    /// Set the geolocation mappings for this execution context.
915    pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
916        self.inner.geolocation = geolocation;
917        self
918    }
919
920    /// Set the dictionaries for this execution context.
921    pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
922        self.inner.dictionaries = dictionaries;
923        self
924    }
925
926    /// Set the object store for this execution context.
927    pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
928        self.inner.object_store = object_store;
929        self
930    }
931
932    /// Set the secret stores for this execution context.
933    pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
934        self.inner.secret_stores = secret_stores;
935        self
936    }
937    /// Set the shielding sites for this execution context.
938    pub fn with_shielding_sites(mut self, shielding_sites: ShieldingSites) -> Self {
939        self.inner.shielding_sites = shielding_sites;
940        self
941    }
942
943    /// Set the path to the config for this execution context.
944    pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
945        self.inner.config_path = Some(config_path);
946        self
947    }
948
949    /// Set where to direct logging endpoint messages for this execution
950    /// context. Defaults to stdout.
951    pub fn with_capture_logs(mut self, capture_logs: Arc<Mutex<dyn Write + Send>>) -> Self {
952        self.inner.capture_logs = capture_logs;
953        self
954    }
955
956    /// Set the stdout logging policy for this execution context.
957    pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
958        self.inner.log_stdout = log_stdout;
959        self
960    }
961
962    /// Set the stderr logging policy for this execution context.
963    pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
964        self.inner.log_stderr = log_stderr;
965        self
966    }
967
968    /// Set the local Pushpin proxy port
969    pub fn with_local_pushpin_proxy_port(mut self, local_pushpin_proxy_port: Option<u16>) -> Self {
970        self.inner.local_pushpin_proxy_port = local_pushpin_proxy_port;
971        self
972    }
973}
974
975fn write_profile(store: &mut wasmtime::Store<WasmCtx>, guest_profile_path: Option<&PathBuf>) {
976    if let (Some(profile), Some(path)) =
977        (store.data_mut().take_guest_profiler(), guest_profile_path)
978    {
979        if let Err(e) = std::fs::File::create(path)
980            .map_err(anyhow::Error::new)
981            .and_then(|output| profile.finish(std::io::BufWriter::new(output)))
982        {
983            event!(
984                Level::ERROR,
985                "failed writing profile at {}: {e:#}",
986                path.display()
987            );
988        } else {
989            event!(
990                Level::INFO,
991                "\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
992                path.display()
993            );
994        }
995    }
996}
997
998fn guest_result_to_response(resp: Response<Body>, err: Option<anyhow::Error>) -> Response<Body> {
999    err.as_ref().map(anyhow_response).unwrap_or(resp)
1000}
1001
1002fn exec_err_to_response(err: &ExecutionError) -> Response<Body> {
1003    if let ExecutionError::WasmTrap(e) = err {
1004        anyhow_response(e)
1005    } else {
1006        panic!("failed to run guest: {err}")
1007    }
1008}
1009
1010fn anyhow_response(err: &anyhow::Error) -> Response<Body> {
1011    Response::builder()
1012        .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
1013        .body(Body::from(format!("{err:?}").into_bytes()))
1014        .unwrap()
1015}
1016
1017impl Drop for ExecuteCtx {
1018    fn drop(&mut self) {
1019        if let Some(join_handle) = self.epoch_increment_thread.take() {
1020            self.epoch_increment_stop.store(true, Ordering::Relaxed);
1021            join_handle.join().unwrap();
1022        }
1023    }
1024}
1025
1026fn configure_wasmtime(
1027    allow_components: bool,
1028    profiling_strategy: ProfilingStrategy,
1029) -> wasmtime::Config {
1030    use wasmtime::{Config, InstanceAllocationStrategy, WasmBacktraceDetails};
1031
1032    let mut config = Config::new();
1033    config.debug_info(false); // Keep this disabled - wasmtime will hang if enabled
1034    config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
1035    config.async_support(true);
1036    config.epoch_interruption(true);
1037    config.profiler(profiling_strategy);
1038
1039    config.allocation_strategy(InstanceAllocationStrategy::OnDemand);
1040
1041    if allow_components {
1042        config.wasm_component_model(true);
1043    }
1044
1045    // Wasm permits the "relaxed" instructions to be nondeterministic
1046    // between runs, but requires them to be deterministic within runs.
1047    // Snapshotting a program's execution to avoid redundantly running
1048    // initialization code on each request is an important optimization,
1049    // so we enable deterministic lowerings for relaxed SIMD to ensure
1050    // that it works consistently even if the initialization runs on a
1051    // different host architecture.
1052    config.relaxed_simd_deterministic(true);
1053
1054    config
1055}
1056
1057#[pin_project]
1058struct CpuTimeTracking<F> {
1059    #[pin]
1060    future: F,
1061    time_spent: Arc<AtomicU64>,
1062}
1063
1064impl<F> CpuTimeTracking<F> {
1065    fn new(time_spent: Arc<AtomicU64>, future: F) -> Self {
1066        CpuTimeTracking { future, time_spent }
1067    }
1068}
1069
1070impl<E, F: Future<Output = Result<(), E>>> Future for CpuTimeTracking<F> {
1071    type Output = F::Output;
1072
1073    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1074        let me = self.project();
1075
1076        let start = Instant::now();
1077        let result = me.future.poll(cx);
1078        // 2^64 microseconds is over half a million years, so I'm not terribly
1079        // worried about this cast.
1080        let runtime = start.elapsed().as_micros() as u64;
1081        let _ = me.time_spent.fetch_add(runtime, Ordering::SeqCst);
1082        result
1083    }
1084}