1use {
4 crate::{
5 acl::Acls,
6 adapt,
7 body::Body,
8 body_tee::tee,
9 cache::Cache,
10 component as compute,
11 config::{
12 Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation,
13 UnknownImportBehavior,
14 },
15 downstream::{prepare_request, DownstreamMetadata, DownstreamRequest, DownstreamResponse},
16 error::{ExecutionError, NonHttpResponse},
17 linking::{create_store, link_host_functions, ComponentCtx, WasmCtx},
18 object_store::ObjectStores,
19 pushpin::{proxy_through_pushpin, PushpinRedirectRequestInfo},
20 secret_store::SecretStores,
21 session::Session,
22 shielding_site::ShieldingSites,
23 upstream::TlsConfig,
24 Error,
25 },
26 futures::{
27 task::{Context, Poll},
28 Future,
29 },
30 http::StatusCode,
31 hyper::{Request, Response},
32 pin_project::pin_project,
33 std::{
34 collections::HashSet,
35 fmt, fs,
36 io::Write,
37 net::{Ipv4Addr, SocketAddr},
38 path::{Path, PathBuf},
39 pin::Pin,
40 sync::{
41 atomic::{AtomicBool, AtomicU64, Ordering},
42 Arc, Mutex,
43 },
44 thread::{self, JoinHandle},
45 time::{Duration, Instant, SystemTime},
46 },
47 tokio::sync::oneshot::{self, Sender},
48 tokio::sync::Mutex as AsyncMutex,
49 tracing::{error, event, info, info_span, warn, Instrument, Level},
50 wasmtime::{
51 component::{self, Component},
52 Engine, GuestProfiler, InstancePre, Linker, Module, ProfilingStrategy,
53 },
54 wasmtime_wasi::I32Exit,
55};
56
57pub const DEFAULT_EPOCH_INTERRUPTION_PERIOD: Duration = Duration::from_micros(50);
58
59const NEXT_REQ_PENDING_MAX: usize = 5;
60const REGION_NONE: &str = "none";
61
62enum Instance {
63 Module(Module, InstancePre<WasmCtx>),
64 Component(compute::bindings::AdapterServicePre<ComponentCtx>),
65}
66
67impl Instance {
68 fn unwrap_module(&self) -> (&Module, &InstancePre<WasmCtx>) {
69 match self {
70 Instance::Module(m, i) => (m, i),
71 Instance::Component(_) => panic!("unwrap_module called on a component"),
72 }
73 }
74}
75
76#[derive(Clone)]
77pub struct GuestProfileConfig {
78 pub path: PathBuf,
82 pub sample_period: Duration,
84}
85
86pub struct NextRequest(Option<(DownstreamRequest, Arc<ExecuteCtx>)>);
87
88impl NextRequest {
89 pub fn into_request(mut self) -> Option<DownstreamRequest> {
90 self.0.take().map(|(r, _)| r)
91 }
92}
93
94impl fmt::Debug for NextRequest {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 let debug = self.0.as_ref().map(|(r, _)| r);
97 f.debug_tuple("NextRequest")
98 .field(&debug)
99 .finish_non_exhaustive()
100 }
101}
102
103impl Drop for NextRequest {
104 fn drop(&mut self) {
105 let Some((req, ctx)) = self.0.take() else {
106 return;
107 };
108
109 ctx.retry_request(req);
110 }
111}
112
113pub struct ExecuteCtx {
119 engine: Engine,
121 instance_pre: Arc<Instance>,
123 acls: Acls,
125 backends: Backends,
127 device_detection: DeviceDetection,
129 geolocation: Geolocation,
131 tls_config: TlsConfig,
133 dictionaries: Dictionaries,
135 config_path: Option<PathBuf>,
137 capture_logs: Arc<Mutex<dyn Write + Send>>,
139 log_stdout: bool,
141 log_stderr: bool,
143 local_pushpin_proxy_port: Option<u16>,
145 next_req_id: Arc<AtomicU64>,
147 object_store: ObjectStores,
149 secret_stores: SecretStores,
151 shielding_sites: ShieldingSites,
153 cache: Arc<Cache>,
155 pending_reuse: Arc<AsyncMutex<Vec<Sender<NextRequest>>>>,
157 epoch_increment_thread: Option<JoinHandle<()>>,
158 epoch_increment_stop: Arc<AtomicBool>,
160 guest_profile_config: Option<Arc<GuestProfileConfig>>,
162}
163
164impl ExecuteCtx {
165 pub fn build(
167 module_path: impl AsRef<Path>,
168 profiling_strategy: ProfilingStrategy,
169 wasi_modules: HashSet<ExperimentalModule>,
170 guest_profile_config: Option<GuestProfileConfig>,
171 unknown_import_behavior: UnknownImportBehavior,
172 adapt_components: bool,
173 ) -> Result<ExecuteCtxBuilder, Error> {
174 let input = fs::read(&module_path)?;
175
176 let is_wat = module_path
177 .as_ref()
178 .extension()
179 .map(|str| str == "wat")
180 .unwrap_or(false);
181
182 let is_component = adapt::is_component(&input);
185 let (is_wat, is_component, input) = if !is_component && adapt_components {
186 let input = if is_wat {
187 let text = String::from_utf8(input).map_err(|_| {
188 anyhow::anyhow!("Failed to parse {}", module_path.as_ref().display())
189 })?;
190 adapt::adapt_wat(&text)?
191 } else {
192 adapt::adapt_bytes(&input)?
193 };
194
195 (false, true, input)
196 } else {
197 (is_wat, is_component, input)
198 };
199
200 let config = &configure_wasmtime(is_component, profiling_strategy);
201 let engine = Engine::new(config)?;
202 let instance_pre = if is_component {
203 warn!(
204 "
205
206 +------------------------------------------------------------------------+
207 | |
208 | Wasm Component support in viceroy is in active development, and is not |
209 | supported for general consumption. |
210 | |
211 +------------------------------------------------------------------------+
212
213 "
214 );
215
216 if !tracing::enabled!(Level::WARN) {
218 eprintln!(
219 "
220
221 +------------------------------------------------------------------------+
222 | |
223 | Wasm Component support in viceroy is in active development, and is not |
224 | supported for general consumption. |
225 | |
226 +------------------------------------------------------------------------+
227
228 "
229 );
230 }
231
232 let mut linker: component::Linker<ComponentCtx> = component::Linker::new(&engine);
233 compute::link_host_functions(&mut linker)?;
234 let component = if is_wat {
235 Component::from_file(&engine, &module_path)?
236 } else {
237 Component::from_binary(&engine, &input)?
238 };
239
240 match unknown_import_behavior {
241 UnknownImportBehavior::LinkError => (),
242 UnknownImportBehavior::Trap => {
243 linker.define_unknown_imports_as_traps(&component)?
244 }
245 }
246
247 let instance_pre = linker.instantiate_pre(&component)?;
248 Instance::Component(compute::bindings::AdapterServicePre::new(instance_pre)?)
249 } else {
250 let mut linker = Linker::new(&engine);
251 link_host_functions(&mut linker, &wasi_modules)?;
252 let module = if is_wat {
253 Module::from_file(&engine, &module_path)?
254 } else {
255 Module::from_binary(&engine, &input)?
256 };
257
258 match unknown_import_behavior {
259 UnknownImportBehavior::LinkError => (),
260 UnknownImportBehavior::Trap => linker.define_unknown_imports_as_traps(&module)?,
261 }
262
263 let instance_pre = linker.instantiate_pre(&module)?;
264 Instance::Module(module, instance_pre)
265 };
266
267 let epoch_increment_stop = Arc::new(AtomicBool::new(false));
273 let engine_clone = engine.clone();
274 let epoch_increment_stop_clone = epoch_increment_stop.clone();
275 let sample_period = guest_profile_config
276 .as_ref()
277 .map(|c| c.sample_period)
278 .unwrap_or(DEFAULT_EPOCH_INTERRUPTION_PERIOD);
279 let epoch_increment_thread = Some(thread::spawn(move || {
280 while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
281 thread::sleep(sample_period);
282 engine_clone.increment_epoch();
283 }
284 }));
285
286 let inner = Self {
287 engine,
288 instance_pre: Arc::new(instance_pre),
289 acls: Acls::new(),
290 backends: Backends::default(),
291 device_detection: DeviceDetection::default(),
292 geolocation: Geolocation::default(),
293 tls_config: TlsConfig::new()?,
294 dictionaries: Dictionaries::default(),
295 config_path: None,
296 capture_logs: Arc::new(Mutex::new(std::io::stdout())),
297 log_stdout: false,
298 log_stderr: false,
299 local_pushpin_proxy_port: None,
300 next_req_id: Arc::new(AtomicU64::new(0)),
301 object_store: ObjectStores::new(),
302 secret_stores: SecretStores::new(),
303 shielding_sites: ShieldingSites::new(),
304 epoch_increment_thread,
305 epoch_increment_stop,
306 guest_profile_config: guest_profile_config.map(|c| Arc::new(c)),
307 cache: Arc::new(Cache::default()),
308 pending_reuse: Arc::new(AsyncMutex::new(vec![])),
309 };
310
311 Ok(ExecuteCtxBuilder { inner })
312 }
313
314 pub fn new(
316 module_path: impl AsRef<Path>,
317 profiling_strategy: ProfilingStrategy,
318 wasi_modules: HashSet<ExperimentalModule>,
319 guest_profile_config: Option<GuestProfileConfig>,
320 unknown_import_behavior: UnknownImportBehavior,
321 adapt_components: bool,
322 ) -> Result<Arc<Self>, Error> {
323 ExecuteCtx::build(
324 module_path,
325 profiling_strategy,
326 wasi_modules,
327 guest_profile_config,
328 unknown_import_behavior,
329 adapt_components,
330 )?
331 .finish()
332 }
333
334 pub fn engine(&self) -> &Engine {
336 &self.engine
337 }
338
339 pub fn acls(&self) -> &Acls {
341 &self.acls
342 }
343
344 pub fn backends(&self) -> &Backends {
346 &self.backends
347 }
348
349 pub fn device_detection(&self) -> &DeviceDetection {
351 &self.device_detection
352 }
353
354 pub fn geolocation(&self) -> &Geolocation {
356 &self.geolocation
357 }
358
359 pub fn dictionaries(&self) -> &Dictionaries {
361 &self.dictionaries
362 }
363
364 pub fn capture_logs(&self) -> Arc<Mutex<dyn Write + Send>> {
366 self.capture_logs.clone()
367 }
368
369 pub fn log_stdout(&self) -> bool {
371 self.log_stdout
372 }
373
374 pub fn log_stderr(&self) -> bool {
376 self.log_stderr
377 }
378
379 pub fn tls_config(&self) -> &TlsConfig {
381 &self.tls_config
382 }
383
384 async fn maybe_receive_response(
385 receiver: oneshot::Receiver<DownstreamResponse>,
386 ) -> Option<(Response<Body>, Option<anyhow::Error>)> {
387 match receiver.await.ok()? {
388 DownstreamResponse::Http(resp) => Some((resp, None)),
389 DownstreamResponse::RedirectToPushpin(info) => Some((
390 Response::new(Body::empty()),
391 Some(NonHttpResponse::PushpinRedirect(info).into()),
392 )),
393 }
394 }
395
396 pub async fn handle_request(
424 self: Arc<Self>,
425 mut incoming_req: Request<hyper::Body>,
426 local: SocketAddr,
427 remote: SocketAddr,
428 ) -> Result<(Response<Body>, Option<anyhow::Error>), Error> {
429 let orig_req_on_upgrade = hyper::upgrade::on(&mut incoming_req);
430 let (incoming_req_parts, incoming_req_body) = incoming_req.into_parts();
431 let local_pushpin_proxy_port = self.local_pushpin_proxy_port;
432
433 let (body_for_wasm, orig_body_tee) = tee(incoming_req_body).await;
434 let orig_request_info_for_pushpin =
435 PushpinRedirectRequestInfo::from_parts(&incoming_req_parts);
436
437 let original_headers = incoming_req_parts.headers.clone();
438 let req = prepare_request(Request::from_parts(incoming_req_parts, body_for_wasm))?;
439
440 let req_id = self
441 .next_req_id
442 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
443
444 let metadata = DownstreamMetadata {
445 req_id,
446 server_addr: local,
447 client_addr: remote,
448 compliance_region: String::from(REGION_NONE),
449 original_headers,
450 };
451
452 let (resp, mut err) = self.reuse_or_spawn_guest(req, metadata).await;
453
454 let span = info_span!("request", id = req_id);
455 let _span = span.enter();
456
457 info!("response status: {:?}", resp.status());
458
459 if let Some(e) = err {
460 match e.downcast::<NonHttpResponse>() {
461 Ok(NonHttpResponse::PushpinRedirect(redirect_info)) => {
462 let backend_name = redirect_info.backend_name;
463 let redirect_request_info = redirect_info.request_info;
464 info!("Pushpin redirect signaled to backend '{}'", backend_name);
465
466 let local_pushpin_proxy_port = match local_pushpin_proxy_port {
467 None => {
468 error!("Pushpin redirect signaled, but Pushpin mode not enabled.");
469 let err = anyhow::anyhow!(
470 "Pushpin redirect signaled, but Pushpin mode not enabled."
471 );
472 let resp = Response::builder()
473 .status(StatusCode::INTERNAL_SERVER_ERROR)
474 .body(Body::from(hyper::Body::from(err.to_string())))?;
475 return Ok((resp, Some(err)));
476 }
477 Some(port) => port,
478 };
479
480 let proxy_resp = proxy_through_pushpin(
481 SocketAddr::new(Ipv4Addr::LOCALHOST.into(), local_pushpin_proxy_port),
482 backend_name,
483 redirect_request_info,
484 orig_request_info_for_pushpin,
485 orig_body_tee,
486 orig_req_on_upgrade,
487 )
488 .await;
489
490 let (p, hyper_body) = proxy_resp.into_parts();
491 return Ok((Response::from_parts(p, Body::from(hyper_body)), None));
492 }
493 Err(e) => {
494 err = Some(e);
495 }
496 }
497 }
498
499 Ok((resp, err))
500 }
501
502 pub(crate) fn retry_request(self: Arc<Self>, mut downstream: DownstreamRequest) {
505 if downstream.sender.is_closed() {
506 return;
507 }
508
509 tokio::task::spawn(async move {
510 let (sender, receiver) = oneshot::channel();
511 let original = std::mem::replace(&mut downstream.sender, sender);
512 let (resp, err) = self.spawn_guest(downstream, receiver).await;
513 let resp = guest_result_to_response(resp, err);
514 let _ = original.send(DownstreamResponse::Http(resp));
515 });
516 }
517
518 pub async fn handle_request_with_runtime_error(
519 self: Arc<Self>,
520 incoming_req: Request<hyper::Body>,
521 local: SocketAddr,
522 remote: SocketAddr,
523 ) -> Result<Response<Body>, Error> {
524 let result = self.handle_request(incoming_req, local, remote).await?;
525 let resp = guest_result_to_response(result.0, result.1);
526
527 Ok(resp)
528 }
529
530 async fn reuse_or_spawn_guest(
531 self: Arc<Self>,
532 req: Request<Body>,
533 metadata: DownstreamMetadata,
534 ) -> (Response<Body>, Option<anyhow::Error>) {
535 let (sender, receiver) = oneshot::channel();
536 let downstream = DownstreamRequest {
537 req,
538 sender,
539 metadata,
540 };
541
542 let mut next_req = NextRequest(Some((downstream, self.clone())));
543 let mut reusable = self.pending_reuse.lock().await;
544
545 while let Some(pending) = reusable.pop() {
546 match pending.send(next_req) {
547 Ok(()) => {
548 drop(reusable);
550
551 if let Some(response) = Self::maybe_receive_response(receiver).await {
552 return response;
553 }
554 return (Response::default(), None);
555 }
556 Err(nr) => next_req = nr,
557 }
558 }
559
560 drop(reusable);
561
562 let downstream = next_req
563 .into_request()
564 .expect("request should still be unprocessed");
565 self.spawn_guest(downstream, receiver).await
566 }
567
568 async fn spawn_guest(
569 self: Arc<Self>,
570 downstream: DownstreamRequest,
571 receiver: oneshot::Receiver<DownstreamResponse>,
572 ) -> (Response<Body>, Option<anyhow::Error>) {
573 let active_cpu_time_us = Arc::new(AtomicU64::new(0));
574
575 let req_id = downstream.metadata.req_id;
578 let guest_handle = tokio::task::spawn(CpuTimeTracking::new(
579 active_cpu_time_us.clone(),
580 self.run_guest(downstream, active_cpu_time_us)
581 .instrument(info_span!("request", id = req_id)),
582 ));
583
584 if let Some(response) = Self::maybe_receive_response(receiver).await {
585 return response;
586 }
587
588 match guest_handle
589 .await
590 .expect("guest worker finished without panicking")
591 {
592 Ok(_) => (Response::new(Body::empty()), None),
593 Err(ExecutionError::WasmTrap(e)) => {
594 event!(
595 Level::ERROR,
596 "There was an error handling the request {}",
597 e.to_string()
598 );
599 (anyhow_response(&e), Some(e))
600 }
601 Err(e) => panic!("failed to run guest: {}", e),
602 }
603 }
604
605 async fn run_guest(
606 self: Arc<Self>,
607 downstream: DownstreamRequest,
608 active_cpu_time_us: Arc<AtomicU64>,
609 ) -> Result<(), ExecutionError> {
610 info!(
611 "handling request {} {}",
612 downstream.req.method(),
613 downstream.req.uri()
614 );
615 let start_timestamp = Instant::now();
616 let req_id = downstream.metadata.req_id;
617 let session = Session::new(downstream, active_cpu_time_us, self.clone());
618
619 let guest_profile_path = self.guest_profile_config.as_deref().map(|pcfg| {
620 let now = SystemTime::now()
621 .duration_since(SystemTime::UNIX_EPOCH)
622 .unwrap()
623 .as_secs();
624 pcfg.path.join(format!("{}-{}.json", now, req_id))
625 });
626
627 match self.instance_pre.as_ref() {
628 Instance::Component(instance_pre) => {
629 if self.guest_profile_config.is_some() {
630 warn!("Components do not currently support the guest profiler");
631 }
632
633 let req = session.downstream_request();
634 let body = session.downstream_request_body();
635
636 let mut store = ComponentCtx::create_store(&self, session, None, |ctx| {
637 ctx.arg("compute-app");
638 })
639 .map_err(ExecutionError::Context)?;
640
641 let compute = instance_pre
642 .instantiate_async(&mut store)
643 .await
644 .map_err(ExecutionError::Instantiation)?;
645
646 let result = compute
647 .fastly_compute_http_incoming()
648 .call_handle(&mut store, req.into(), body.into())
649 .await;
650
651 let outcome = match result {
652 Ok(Ok(())) => Ok(()),
653
654 Ok(Err(())) => {
655 event!(Level::ERROR, "WebAssembly exited with an error");
656 Err(ExecutionError::WasmTrap(anyhow::Error::msg("failed")))
657 }
658
659 Err(e) => {
660 if let Some(exit) = e.downcast_ref::<I32Exit>() {
661 if exit.0 == 0 {
662 Ok(())
663 } else {
664 event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
665 Err(ExecutionError::WasmTrap(e))
666 }
667 } else {
668 event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
669 Err(ExecutionError::WasmTrap(e))
670 }
671 }
672 };
673
674 let resp = outcome
677 .as_ref()
678 .err()
679 .map(exec_err_to_response)
680 .unwrap_or_default();
681 store
682 .data_mut()
683 .session
684 .close_downstream_response_sender(resp);
685
686 let request_duration = Instant::now().duration_since(start_timestamp);
687
688 info!(
689 "guest completed using {} of WebAssembly heap",
690 bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64),
691 );
692
693 info!("guest completed in {:.0?}", request_duration);
694
695 outcome
696 }
697
698 Instance::Module(module, instance_pre) => {
699 let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
700 let program_name = "main";
701 GuestProfiler::new(
702 program_name,
703 pcfg.sample_period,
704 vec![(program_name.to_string(), module.clone())],
705 )
706 });
707
708 let mut store = create_store(&self, session, profiler, |ctx| {
713 ctx.arg("compute-app");
714 })
715 .map_err(ExecutionError::Context)?;
716
717 let instance = instance_pre
718 .instantiate_async(&mut store)
719 .await
720 .map_err(ExecutionError::Instantiation)?;
721
722 let main_func = instance
725 .get_typed_func::<(), ()>(&mut store, "_start")
726 .map_err(ExecutionError::Typechecking)?;
727
728 let outcome = match main_func.call_async(&mut store, ()).await {
730 Ok(_) => Ok(()),
731 Err(e) => {
732 if let Some(exit) = e.downcast_ref::<I32Exit>() {
733 if exit.0 == 0 {
734 Ok(())
735 } else {
736 event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
737 Err(ExecutionError::WasmTrap(e))
738 }
739 } else {
740 event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
741 Err(ExecutionError::WasmTrap(e))
742 }
743 }
744 };
745
746 write_profile(&mut store, guest_profile_path.as_ref());
748
749 let resp = outcome
752 .as_ref()
753 .err()
754 .map(exec_err_to_response)
755 .unwrap_or_default();
756 store.data_mut().close_downstream_response_sender(resp);
757
758 let request_duration = Instant::now().duration_since(start_timestamp);
759
760 info!(
761 "request completed using {} of WebAssembly heap",
762 bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64)
763 );
764
765 info!("request completed in {:.0?}", request_duration);
766
767 outcome
768 }
769 }
770 }
771
772 pub async fn run_main(
773 self: Arc<Self>,
774 program_name: &str,
775 args: &[String],
776 ) -> Result<(), anyhow::Error> {
777 let req = Request::get("http://example.com/").body(Body::empty())?;
779 let metadata = DownstreamMetadata {
780 req_id: 0,
781 server_addr: (Ipv4Addr::LOCALHOST, 80).into(),
782 client_addr: (Ipv4Addr::LOCALHOST, 0).into(),
783 compliance_region: String::from(REGION_NONE),
784 original_headers: Default::default(),
785 };
786 let (sender, receiver) = oneshot::channel();
787 let downstream = DownstreamRequest {
788 req,
789 sender,
790 metadata,
791 };
792 let active_cpu_time_us = Arc::new(AtomicU64::new(0));
793
794 let session = Session::new(downstream, active_cpu_time_us.clone(), self.clone());
795
796 if let Instance::Component(_) = self.instance_pre.as_ref() {
797 panic!("components not currently supported with `run`");
798 }
799
800 let (module, instance_pre) = self.instance_pre.unwrap_module();
801
802 let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
803 GuestProfiler::new(
804 program_name,
805 pcfg.sample_period,
806 vec![(program_name.to_string(), module.clone())],
807 )
808 });
809
810 let mut store = create_store(&self, session, profiler, |builder| {
811 builder.arg(program_name);
812 for arg in args {
813 builder.arg(arg);
814 }
815 })
816 .map_err(ExecutionError::Context)?;
817
818 let instance = instance_pre
819 .instantiate_async(&mut store)
820 .await
821 .map_err(ExecutionError::Instantiation)?;
822
823 let main_func = instance
826 .get_typed_func::<(), ()>(&mut store, "_start")
827 .map_err(ExecutionError::Typechecking)?;
828
829 let result =
831 CpuTimeTracking::new(active_cpu_time_us, main_func.call_async(&mut store, ())).await;
832
833 write_profile(
835 &mut store,
836 self.guest_profile_config.as_deref().map(|cfg| &cfg.path),
837 );
838
839 store
842 .data_mut()
843 .close_downstream_response_sender(Response::default());
844
845 drop(receiver);
849
850 result
851 }
852
853 pub fn cache(&self) -> &Arc<Cache> {
854 &self.cache
855 }
856
857 pub fn config_path(&self) -> Option<&Path> {
858 self.config_path.as_deref()
859 }
860
861 pub fn object_store(&self) -> &ObjectStores {
862 &self.object_store
863 }
864
865 pub fn secret_stores(&self) -> &SecretStores {
866 &self.secret_stores
867 }
868
869 pub fn shielding_sites(&self) -> &ShieldingSites {
870 &self.shielding_sites
871 }
872
873 pub async fn register_pending_downstream(&self) -> Option<oneshot::Receiver<NextRequest>> {
874 let mut pending = self.pending_reuse.lock().await;
875
876 if pending.len() >= NEXT_REQ_PENDING_MAX {
877 return None;
878 }
879
880 let (tx, rx) = oneshot::channel();
881 pending.push(tx);
882
883 Some(rx)
884 }
885}
886
887pub struct ExecuteCtxBuilder {
888 inner: ExecuteCtx,
889}
890
891impl ExecuteCtxBuilder {
892 pub fn finish(self) -> Result<Arc<ExecuteCtx>, Error> {
893 Ok(Arc::new(self.inner))
894 }
895
896 pub fn with_acls(mut self, acls: Acls) -> Self {
898 self.inner.acls = acls;
899 self
900 }
901
902 pub fn with_backends(mut self, backends: Backends) -> Self {
904 self.inner.backends = backends;
905 self
906 }
907
908 pub fn with_device_detection(mut self, device_detection: DeviceDetection) -> Self {
910 self.inner.device_detection = device_detection;
911 self
912 }
913
914 pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
916 self.inner.geolocation = geolocation;
917 self
918 }
919
920 pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
922 self.inner.dictionaries = dictionaries;
923 self
924 }
925
926 pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
928 self.inner.object_store = object_store;
929 self
930 }
931
932 pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
934 self.inner.secret_stores = secret_stores;
935 self
936 }
937 pub fn with_shielding_sites(mut self, shielding_sites: ShieldingSites) -> Self {
939 self.inner.shielding_sites = shielding_sites;
940 self
941 }
942
943 pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
945 self.inner.config_path = Some(config_path);
946 self
947 }
948
949 pub fn with_capture_logs(mut self, capture_logs: Arc<Mutex<dyn Write + Send>>) -> Self {
952 self.inner.capture_logs = capture_logs;
953 self
954 }
955
956 pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
958 self.inner.log_stdout = log_stdout;
959 self
960 }
961
962 pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
964 self.inner.log_stderr = log_stderr;
965 self
966 }
967
968 pub fn with_local_pushpin_proxy_port(mut self, local_pushpin_proxy_port: Option<u16>) -> Self {
970 self.inner.local_pushpin_proxy_port = local_pushpin_proxy_port;
971 self
972 }
973}
974
975fn write_profile(store: &mut wasmtime::Store<WasmCtx>, guest_profile_path: Option<&PathBuf>) {
976 if let (Some(profile), Some(path)) =
977 (store.data_mut().take_guest_profiler(), guest_profile_path)
978 {
979 if let Err(e) = std::fs::File::create(path)
980 .map_err(anyhow::Error::new)
981 .and_then(|output| profile.finish(std::io::BufWriter::new(output)))
982 {
983 event!(
984 Level::ERROR,
985 "failed writing profile at {}: {e:#}",
986 path.display()
987 );
988 } else {
989 event!(
990 Level::INFO,
991 "\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
992 path.display()
993 );
994 }
995 }
996}
997
998fn guest_result_to_response(resp: Response<Body>, err: Option<anyhow::Error>) -> Response<Body> {
999 err.as_ref().map(anyhow_response).unwrap_or(resp)
1000}
1001
1002fn exec_err_to_response(err: &ExecutionError) -> Response<Body> {
1003 if let ExecutionError::WasmTrap(e) = err {
1004 anyhow_response(e)
1005 } else {
1006 panic!("failed to run guest: {err}")
1007 }
1008}
1009
1010fn anyhow_response(err: &anyhow::Error) -> Response<Body> {
1011 Response::builder()
1012 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
1013 .body(Body::from(format!("{err:?}").into_bytes()))
1014 .unwrap()
1015}
1016
1017impl Drop for ExecuteCtx {
1018 fn drop(&mut self) {
1019 if let Some(join_handle) = self.epoch_increment_thread.take() {
1020 self.epoch_increment_stop.store(true, Ordering::Relaxed);
1021 join_handle.join().unwrap();
1022 }
1023 }
1024}
1025
1026fn configure_wasmtime(
1027 allow_components: bool,
1028 profiling_strategy: ProfilingStrategy,
1029) -> wasmtime::Config {
1030 use wasmtime::{Config, InstanceAllocationStrategy, WasmBacktraceDetails};
1031
1032 let mut config = Config::new();
1033 config.debug_info(false); config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
1035 config.async_support(true);
1036 config.epoch_interruption(true);
1037 config.profiler(profiling_strategy);
1038
1039 config.allocation_strategy(InstanceAllocationStrategy::OnDemand);
1040
1041 if allow_components {
1042 config.wasm_component_model(true);
1043 }
1044
1045 config.relaxed_simd_deterministic(true);
1053
1054 config
1055}
1056
1057#[pin_project]
1058struct CpuTimeTracking<F> {
1059 #[pin]
1060 future: F,
1061 time_spent: Arc<AtomicU64>,
1062}
1063
1064impl<F> CpuTimeTracking<F> {
1065 fn new(time_spent: Arc<AtomicU64>, future: F) -> Self {
1066 CpuTimeTracking { future, time_spent }
1067 }
1068}
1069
1070impl<E, F: Future<Output = Result<(), E>>> Future for CpuTimeTracking<F> {
1071 type Output = F::Output;
1072
1073 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1074 let me = self.project();
1075
1076 let start = Instant::now();
1077 let result = me.future.poll(cx);
1078 let runtime = start.elapsed().as_micros() as u64;
1081 let _ = me.time_spent.fetch_add(runtime, Ordering::SeqCst);
1082 result
1083 }
1084}