use {
crate::{
Error,
acl::Acls,
adapt,
body::Body,
body_tee::tee,
cache::Cache,
component as compute,
config::{
Backends, DeviceDetection, Dictionaries, ExperimentalModule, FakeValidFastlyKeys,
Geolocation, UnknownImportBehavior,
},
downstream::{DownstreamMetadata, DownstreamRequest, DownstreamResponse, prepare_request},
error::{ExecutionError, NonHttpResponse},
linking::{ComponentCtx, WasmCtx, create_store, link_host_functions},
object_store::ObjectStores,
pushpin::{PushpinRedirectRequestInfo, proxy_through_pushpin},
secret_store::SecretStores,
session::Session,
shielding_site::ShieldingSites,
upstream::TlsConfig,
},
futures::{
Future,
task::{Context, Poll},
},
http::StatusCode,
hyper::{Request, Response},
pin_project::pin_project,
std::{
collections::HashSet,
fmt, fs,
io::Write,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
pin::Pin,
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread::{self, JoinHandle},
time::{Duration, Instant, SystemTime},
},
tokio::sync::Mutex as AsyncMutex,
tokio::sync::oneshot::{self, Sender},
tracing::{Instrument, Level, error, event, info, info_span, warn},
wasmtime::{
Engine, GuestProfiler, InstancePre, Linker, Module, ProfilingStrategy,
component::{self, Component},
},
wasmtime_wasi::I32Exit,
};
pub use wasmtime::WasmFeatures;
pub const DEFAULT_EPOCH_INTERRUPTION_PERIOD: Duration = Duration::from_micros(50);
const NEXT_REQ_PENDING_MAX: usize = 5;
const REGION_NONE: &str = "none";
enum Instance {
Module(Module, InstancePre<WasmCtx>),
Component(
Component,
compute::bindings::AdapterServicePre<ComponentCtx>,
),
}
impl Instance {
fn unwrap_module(&self) -> (&Module, &InstancePre<WasmCtx>) {
match self {
Instance::Module(m, i) => (m, i),
Instance::Component(_, _) => panic!("unwrap_module called on a component"),
}
}
}
#[derive(Clone)]
pub struct GuestProfileConfig {
pub path: PathBuf,
pub sample_period: Duration,
}
pub struct NextRequest(Option<(Box<DownstreamRequest>, Arc<ExecuteCtx>)>);
impl NextRequest {
pub fn into_request(mut self) -> Option<DownstreamRequest> {
self.0.take().map(|(r, _)| *r)
}
}
impl fmt::Debug for NextRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let debug = self.0.as_ref().map(|(r, _)| r);
f.debug_tuple("NextRequest")
.field(&debug)
.finish_non_exhaustive()
}
}
impl Drop for NextRequest {
fn drop(&mut self) {
let Some((req, ctx)) = self.0.take() else {
return;
};
ctx.retry_request(*req);
}
}
pub struct ExecuteCtx {
engine: Engine,
instance_pre: Arc<Instance>,
acls: Acls,
backends: Backends,
device_detection: DeviceDetection,
geolocation: Geolocation,
tls_config: TlsConfig,
dictionaries: Dictionaries,
config_path: Option<PathBuf>,
capture_logs: Arc<Mutex<dyn Write + Send>>,
log_stdout: bool,
log_stderr: bool,
local_pushpin_proxy_port: Option<u16>,
next_req_id: Arc<AtomicU64>,
object_store: ObjectStores,
secret_stores: SecretStores,
shielding_sites: ShieldingSites,
fake_valid_fastly_keys: FakeValidFastlyKeys,
cache: Arc<Cache>,
pending_reuse: Arc<AsyncMutex<Vec<Sender<NextRequest>>>>,
epoch_increment_thread: Option<JoinHandle<()>>,
epoch_increment_stop: Arc<AtomicBool>,
guest_profile_config: Option<Arc<GuestProfileConfig>>,
}
impl ExecuteCtx {
pub fn build(
module_path: impl AsRef<Path>,
profiling_strategy: ProfilingStrategy,
wasi_modules: HashSet<ExperimentalModule>,
guest_profile_config: Option<GuestProfileConfig>,
unknown_import_behavior: UnknownImportBehavior,
adapt_components: bool,
wasm_features: WasmFeatures,
) -> Result<ExecuteCtxBuilder, Error> {
let input = fs::read(&module_path)?;
let is_wat = module_path
.as_ref()
.extension()
.map(|str| str == "wat")
.unwrap_or(false);
let is_component = adapt::is_component(&input);
let (is_wat, is_component, input) = if !is_component && adapt_components {
let input = if is_wat {
let text = String::from_utf8(input).map_err(|_| {
anyhow::anyhow!("Failed to parse {}", module_path.as_ref().display())
})?;
adapt::adapt_wat(&text)?
} else {
adapt::adapt_bytes(&input)?
};
(false, true, input)
} else {
(is_wat, is_component, input)
};
let config = &configure_wasmtime(wasm_features, profiling_strategy);
let engine = Engine::new(config)?;
let instance_pre = if is_component {
warn!(
"
+------------------------------------------------------------------------+
| |
| Wasm Component support in viceroy is in active development, and is not |
| supported for general consumption. |
| |
+------------------------------------------------------------------------+
"
);
if !tracing::enabled!(Level::WARN) {
eprintln!(
"
+------------------------------------------------------------------------+
| |
| Wasm Component support in viceroy is in active development, and is not |
| supported for general consumption. |
| |
+------------------------------------------------------------------------+
"
);
}
let mut linker: component::Linker<ComponentCtx> = component::Linker::new(&engine);
compute::link_host_functions(&mut linker)?;
let component = if is_wat {
Component::from_file(&engine, &module_path)?
} else {
Component::from_binary(&engine, &input)?
};
match unknown_import_behavior {
UnknownImportBehavior::LinkError => (),
UnknownImportBehavior::Trap => {
linker.define_unknown_imports_as_traps(&component)?
}
}
let instance_pre = linker.instantiate_pre(&component)?;
Instance::Component(
component,
compute::bindings::AdapterServicePre::new(instance_pre)?,
)
} else {
let mut linker = Linker::new(&engine);
link_host_functions(&mut linker, &wasi_modules)?;
let module = if is_wat {
Module::from_file(&engine, &module_path)?
} else {
Module::from_binary(&engine, &input)?
};
match unknown_import_behavior {
UnknownImportBehavior::LinkError => (),
UnknownImportBehavior::Trap => linker.define_unknown_imports_as_traps(&module)?,
}
let instance_pre = linker.instantiate_pre(&module)?;
Instance::Module(module, instance_pre)
};
let epoch_increment_stop = Arc::new(AtomicBool::new(false));
let engine_clone = engine.clone();
let epoch_increment_stop_clone = epoch_increment_stop.clone();
let sample_period = guest_profile_config
.as_ref()
.map(|c| c.sample_period)
.unwrap_or(DEFAULT_EPOCH_INTERRUPTION_PERIOD);
let epoch_increment_thread = Some(thread::spawn(move || {
while !epoch_increment_stop_clone.load(Ordering::Relaxed) {
thread::sleep(sample_period);
engine_clone.increment_epoch();
}
}));
let inner = Self {
engine,
instance_pre: Arc::new(instance_pre),
acls: Acls::new(),
backends: Backends::default(),
device_detection: DeviceDetection::default(),
geolocation: Geolocation::default(),
tls_config: TlsConfig::new()?,
dictionaries: Dictionaries::default(),
config_path: None,
capture_logs: Arc::new(Mutex::new(std::io::stdout())),
log_stdout: false,
log_stderr: false,
local_pushpin_proxy_port: None,
next_req_id: Arc::new(AtomicU64::new(0)),
object_store: ObjectStores::new(),
secret_stores: SecretStores::new(),
shielding_sites: ShieldingSites::new(),
fake_valid_fastly_keys: FakeValidFastlyKeys::new(),
epoch_increment_thread,
epoch_increment_stop,
guest_profile_config: guest_profile_config.map(Arc::new),
cache: Arc::new(Cache::default()),
pending_reuse: Arc::new(AsyncMutex::new(vec![])),
};
Ok(ExecuteCtxBuilder { inner })
}
pub fn new(
module_path: impl AsRef<Path>,
profiling_strategy: ProfilingStrategy,
wasi_modules: HashSet<ExperimentalModule>,
guest_profile_config: Option<GuestProfileConfig>,
unknown_import_behavior: UnknownImportBehavior,
adapt_components: bool,
wasm_features: WasmFeatures,
) -> Result<Arc<Self>, Error> {
ExecuteCtx::build(
module_path,
profiling_strategy,
wasi_modules,
guest_profile_config,
unknown_import_behavior,
adapt_components,
wasm_features,
)?
.finish()
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn acls(&self) -> &Acls {
&self.acls
}
pub fn backends(&self) -> &Backends {
&self.backends
}
pub fn device_detection(&self) -> &DeviceDetection {
&self.device_detection
}
pub fn geolocation(&self) -> &Geolocation {
&self.geolocation
}
pub fn dictionaries(&self) -> &Dictionaries {
&self.dictionaries
}
pub fn capture_logs(&self) -> Arc<Mutex<dyn Write + Send>> {
self.capture_logs.clone()
}
pub fn log_stdout(&self) -> bool {
self.log_stdout
}
pub fn log_stderr(&self) -> bool {
self.log_stderr
}
pub fn tls_config(&self) -> &TlsConfig {
&self.tls_config
}
async fn maybe_receive_response(
receiver: oneshot::Receiver<DownstreamResponse>,
) -> Option<(Response<Body>, Option<anyhow::Error>)> {
match receiver.await.ok()? {
DownstreamResponse::Http(resp) => Some((resp, None)),
DownstreamResponse::RedirectToPushpin(info) => Some((
Response::new(Body::empty()),
Some(NonHttpResponse::PushpinRedirect(info).into()),
)),
}
}
pub async fn handle_request(
self: Arc<Self>,
mut incoming_req: Request<hyper::Body>,
local: SocketAddr,
remote: SocketAddr,
) -> Result<(Response<Body>, Option<anyhow::Error>), Error> {
let orig_req_on_upgrade = hyper::upgrade::on(&mut incoming_req);
let (incoming_req_parts, incoming_req_body) = incoming_req.into_parts();
let local_pushpin_proxy_port = self.local_pushpin_proxy_port;
let (body_for_wasm, orig_body_tee) = tee(incoming_req_body).await;
let orig_request_info_for_pushpin =
PushpinRedirectRequestInfo::from_parts(&incoming_req_parts);
let original_headers = incoming_req_parts.headers.clone();
let req = prepare_request(Request::from_parts(incoming_req_parts, body_for_wasm))?;
let req_id = self
.next_req_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let metadata = DownstreamMetadata {
req_id,
server_addr: local,
client_addr: remote,
compliance_region: String::from(REGION_NONE),
original_headers,
};
let (resp, mut err) = self.reuse_or_spawn_guest(req, metadata).await;
let span = info_span!("request", id = req_id);
let _span = span.enter();
info!("response status: {:?}", resp.status());
if let Some(e) = err {
match e.downcast::<NonHttpResponse>() {
Ok(NonHttpResponse::PushpinRedirect(redirect_info)) => {
let backend_name = redirect_info.backend_name;
let redirect_request_info = redirect_info.request_info;
info!("Pushpin redirect signaled to backend '{}'", backend_name);
let local_pushpin_proxy_port = match local_pushpin_proxy_port {
None => {
error!("Pushpin redirect signaled, but Pushpin mode not enabled.");
let err = anyhow::anyhow!(
"Pushpin redirect signaled, but Pushpin mode not enabled."
);
let resp = Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(hyper::Body::from(err.to_string())))?;
return Ok((resp, Some(err)));
}
Some(port) => port,
};
let proxy_resp = proxy_through_pushpin(
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), local_pushpin_proxy_port),
backend_name,
redirect_request_info,
orig_request_info_for_pushpin,
orig_body_tee,
orig_req_on_upgrade,
)
.await;
let (p, hyper_body) = proxy_resp.into_parts();
return Ok((Response::from_parts(p, Body::from(hyper_body)), None));
}
Err(e) => {
err = Some(e);
}
}
}
Ok((resp, err))
}
pub(crate) fn retry_request(self: Arc<Self>, mut downstream: DownstreamRequest) {
if downstream.sender.is_closed() {
return;
}
tokio::task::spawn(async move {
let (sender, receiver) = oneshot::channel();
let original = std::mem::replace(&mut downstream.sender, sender);
let (resp, err) = self.spawn_guest(downstream, receiver).await;
let resp = guest_result_to_response(resp, err);
let _ = original.send(DownstreamResponse::Http(resp));
});
}
pub async fn handle_request_with_runtime_error(
self: Arc<Self>,
incoming_req: Request<hyper::Body>,
local: SocketAddr,
remote: SocketAddr,
) -> Result<Response<Body>, Error> {
let result = self.handle_request(incoming_req, local, remote).await?;
let resp = guest_result_to_response(result.0, result.1);
Ok(resp)
}
async fn reuse_or_spawn_guest(
self: Arc<Self>,
req: Request<Body>,
metadata: DownstreamMetadata,
) -> (Response<Body>, Option<anyhow::Error>) {
let (sender, receiver) = oneshot::channel();
let downstream = DownstreamRequest {
req,
sender,
metadata,
};
let mut next_req = NextRequest(Some((Box::new(downstream), self.clone())));
let mut reusable = self.pending_reuse.lock().await;
while let Some(pending) = reusable.pop() {
match pending.send(next_req) {
Ok(()) => {
drop(reusable);
if let Some(response) = Self::maybe_receive_response(receiver).await {
return response;
}
return (Response::default(), None);
}
Err(nr) => next_req = nr,
}
}
drop(reusable);
let downstream = next_req
.into_request()
.expect("request should still be unprocessed");
self.spawn_guest(downstream, receiver).await
}
async fn spawn_guest(
self: Arc<Self>,
downstream: DownstreamRequest,
receiver: oneshot::Receiver<DownstreamResponse>,
) -> (Response<Body>, Option<anyhow::Error>) {
let active_cpu_time_us = Arc::new(AtomicU64::new(0));
let req_id = downstream.metadata.req_id;
let guest_handle = tokio::task::spawn(CpuTimeTracking::new(
active_cpu_time_us.clone(),
self.run_guest(downstream, active_cpu_time_us)
.instrument(info_span!("request", id = req_id)),
));
if let Some(response) = Self::maybe_receive_response(receiver).await {
return response;
}
match guest_handle
.await
.expect("guest worker finished without panicking")
{
Ok(_) => (Response::new(Body::empty()), None),
Err(ExecutionError::WasmTrap(e)) => {
event!(
Level::ERROR,
"There was an error handling the request {}",
e.to_string()
);
(anyhow_response(&e), Some(e))
}
Err(e) => panic!("failed to run guest: {}", e),
}
}
async fn run_guest(
self: Arc<Self>,
downstream: DownstreamRequest,
active_cpu_time_us: Arc<AtomicU64>,
) -> Result<(), ExecutionError> {
info!(
"handling request {} {}",
downstream.req.method(),
downstream.req.uri()
);
let start_timestamp = Instant::now();
let req_id = downstream.metadata.req_id;
let session = Session::new(downstream, active_cpu_time_us, self.clone());
let guest_profile_path = self.guest_profile_config.as_deref().map(|pcfg| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
pcfg.path.join(format!("{}-{}.json", now, req_id))
});
match self.instance_pre.as_ref() {
Instance::Component(component, instance_pre) => {
let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
let program_name = "main";
GuestProfiler::new_component(
program_name,
pcfg.sample_period,
component.clone(),
std::iter::empty(),
)
});
let req = session.downstream_request();
let body = session.downstream_request_body();
let mut store = ComponentCtx::create_store(&self, session, profiler, |ctx| {
ctx.arg("compute-app");
})
.map_err(ExecutionError::Context)?;
let compute = instance_pre
.instantiate_async(&mut store)
.await
.map_err(ExecutionError::Instantiation)?;
let result = compute
.fastly_compute_http_incoming()
.call_handle(&mut store, req.into(), body.into())
.await;
let outcome = match result {
Ok(Ok(())) => Ok(()),
Ok(Err(())) => {
event!(Level::ERROR, "WebAssembly exited with an error");
Err(ExecutionError::WasmTrap(anyhow::Error::msg("failed")))
}
Err(e) => {
if let Some(exit) = e.downcast_ref::<I32Exit>() {
if exit.0 == 0 {
Ok(())
} else {
event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
} else {
event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
}
};
write_profile_component(&mut store, guest_profile_path.as_ref());
let resp = outcome
.as_ref()
.err()
.map(exec_err_to_response)
.unwrap_or_default();
store
.data_mut()
.session
.close_downstream_response_sender(resp);
let request_duration = Instant::now().duration_since(start_timestamp);
info!(
"guest completed using {} of WebAssembly heap",
bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64),
);
info!("guest completed in {:.0?}", request_duration);
outcome
}
Instance::Module(module, instance_pre) => {
let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
let program_name = "main";
GuestProfiler::new(
program_name,
pcfg.sample_period,
vec![(program_name.to_string(), module.clone())],
)
});
let mut store = create_store(&self, session, profiler, |ctx| {
ctx.arg("compute-app");
})
.map_err(ExecutionError::Context)?;
let instance = instance_pre
.instantiate_async(&mut store)
.await
.map_err(ExecutionError::Instantiation)?;
let main_func = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(ExecutionError::Typechecking)?;
let outcome = match main_func.call_async(&mut store, ()).await {
Ok(_) => Ok(()),
Err(e) => {
if let Some(exit) = e.downcast_ref::<I32Exit>() {
if exit.0 == 0 {
Ok(())
} else {
event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
} else {
event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
}
};
write_profile(&mut store, guest_profile_path.as_ref());
let resp = outcome
.as_ref()
.err()
.map(exec_err_to_response)
.unwrap_or_default();
store.data_mut().close_downstream_response_sender(resp);
let request_duration = Instant::now().duration_since(start_timestamp);
info!(
"request completed using {} of WebAssembly heap",
bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64)
);
info!("request completed in {:.0?}", request_duration);
outcome
}
}
}
pub async fn run_main(
self: Arc<Self>,
program_name: &str,
args: &[String],
) -> Result<(), anyhow::Error> {
let req = Request::get("http://example.com/").body(Body::empty())?;
let metadata = DownstreamMetadata {
req_id: 0,
server_addr: (Ipv4Addr::LOCALHOST, 80).into(),
client_addr: (Ipv4Addr::LOCALHOST, 0).into(),
compliance_region: String::from(REGION_NONE),
original_headers: Default::default(),
};
let (sender, receiver) = oneshot::channel();
let downstream = DownstreamRequest {
req,
sender,
metadata,
};
let active_cpu_time_us = Arc::new(AtomicU64::new(0));
let session = Session::new(downstream, active_cpu_time_us.clone(), self.clone());
if let Instance::Component(_, _) = self.instance_pre.as_ref() {
panic!("components not currently supported with `run`");
}
let (module, instance_pre) = self.instance_pre.unwrap_module();
let profiler = self.guest_profile_config.as_deref().map(|pcfg| {
GuestProfiler::new(
program_name,
pcfg.sample_period,
vec![(program_name.to_string(), module.clone())],
)
});
let mut store = create_store(&self, session, profiler, |builder| {
builder.arg(program_name);
for arg in args {
builder.arg(arg);
}
})
.map_err(ExecutionError::Context)?;
let instance = instance_pre
.instantiate_async(&mut store)
.await
.map_err(ExecutionError::Instantiation)?;
let main_func = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(ExecutionError::Typechecking)?;
let result =
CpuTimeTracking::new(active_cpu_time_us, main_func.call_async(&mut store, ())).await;
write_profile(
&mut store,
self.guest_profile_config.as_deref().map(|cfg| &cfg.path),
);
store
.data_mut()
.close_downstream_response_sender(Response::default());
drop(receiver);
result
}
pub fn cache(&self) -> &Arc<Cache> {
&self.cache
}
pub fn config_path(&self) -> Option<&Path> {
self.config_path.as_deref()
}
pub fn object_store(&self) -> &ObjectStores {
&self.object_store
}
pub fn secret_stores(&self) -> &SecretStores {
&self.secret_stores
}
pub fn shielding_sites(&self) -> &ShieldingSites {
&self.shielding_sites
}
pub fn fake_valid_fastly_keys(&self) -> &FakeValidFastlyKeys {
&self.fake_valid_fastly_keys
}
pub async fn register_pending_downstream(&self) -> Option<oneshot::Receiver<NextRequest>> {
let mut pending = self.pending_reuse.lock().await;
if pending.len() >= NEXT_REQ_PENDING_MAX {
return None;
}
let (tx, rx) = oneshot::channel();
pending.push(tx);
Some(rx)
}
pub fn is_component(&self) -> bool {
matches!(self.instance_pre.as_ref(), Instance::Component(_, _))
}
}
pub struct ExecuteCtxBuilder {
inner: ExecuteCtx,
}
impl ExecuteCtxBuilder {
pub fn finish(self) -> Result<Arc<ExecuteCtx>, Error> {
Ok(Arc::new(self.inner))
}
pub fn with_acls(mut self, acls: Acls) -> Self {
self.inner.acls = acls;
self
}
pub fn with_backends(mut self, backends: Backends) -> Self {
self.inner.backends = backends;
self
}
pub fn with_device_detection(mut self, device_detection: DeviceDetection) -> Self {
self.inner.device_detection = device_detection;
self
}
pub fn with_geolocation(mut self, geolocation: Geolocation) -> Self {
self.inner.geolocation = geolocation;
self
}
pub fn with_dictionaries(mut self, dictionaries: Dictionaries) -> Self {
self.inner.dictionaries = dictionaries;
self
}
pub fn with_object_stores(mut self, object_store: ObjectStores) -> Self {
self.inner.object_store = object_store;
self
}
pub fn with_secret_stores(mut self, secret_stores: SecretStores) -> Self {
self.inner.secret_stores = secret_stores;
self
}
pub fn with_shielding_sites(mut self, shielding_sites: ShieldingSites) -> Self {
self.inner.shielding_sites = shielding_sites;
self
}
pub fn with_fake_valid_fastly_keys(
mut self,
fake_valid_fastly_keys: FakeValidFastlyKeys,
) -> Self {
self.inner.fake_valid_fastly_keys = fake_valid_fastly_keys;
self
}
pub fn with_config_path(mut self, config_path: PathBuf) -> Self {
self.inner.config_path = Some(config_path);
self
}
pub fn with_capture_logs(mut self, capture_logs: Arc<Mutex<dyn Write + Send>>) -> Self {
self.inner.capture_logs = capture_logs;
self
}
pub fn with_log_stdout(mut self, log_stdout: bool) -> Self {
self.inner.log_stdout = log_stdout;
self
}
pub fn with_log_stderr(mut self, log_stderr: bool) -> Self {
self.inner.log_stderr = log_stderr;
self
}
pub fn with_local_pushpin_proxy_port(mut self, local_pushpin_proxy_port: Option<u16>) -> Self {
self.inner.local_pushpin_proxy_port = local_pushpin_proxy_port;
self
}
}
fn write_profile_to_file(profile: Box<GuestProfiler>, path: &PathBuf) {
match std::fs::File::create(path)
.map_err(anyhow::Error::new)
.and_then(|output| profile.finish(std::io::BufWriter::new(output)))
{
Err(e) => {
event!(
Level::ERROR,
"failed writing profile at {}: {e:#}",
path.display()
);
}
_ => {
event!(
Level::INFO,
"\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
path.display()
);
}
}
}
fn write_profile(store: &mut wasmtime::Store<WasmCtx>, guest_profile_path: Option<&PathBuf>) {
if let (Some(profile), Some(path)) =
(store.data_mut().take_guest_profiler(), guest_profile_path)
{
write_profile_to_file(profile, path);
}
}
fn write_profile_component(
store: &mut wasmtime::Store<ComponentCtx>,
guest_profile_path: Option<&PathBuf>,
) {
if let (Some(profile), Some(path)) =
(store.data_mut().take_guest_profiler(), guest_profile_path)
{
write_profile_to_file(profile, path);
}
}
fn guest_result_to_response(resp: Response<Body>, err: Option<anyhow::Error>) -> Response<Body> {
err.as_ref().map(anyhow_response).unwrap_or(resp)
}
fn exec_err_to_response(err: &ExecutionError) -> Response<Body> {
if let ExecutionError::WasmTrap(e) = err {
anyhow_response(e)
} else {
panic!("failed to run guest: {err}")
}
}
fn anyhow_response(err: &anyhow::Error) -> Response<Body> {
Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{err:?}").into_bytes()))
.unwrap()
}
impl Drop for ExecuteCtx {
fn drop(&mut self) {
if let Some(join_handle) = self.epoch_increment_thread.take() {
self.epoch_increment_stop.store(true, Ordering::Relaxed);
join_handle.join().unwrap();
}
}
}
fn configure_wasmtime(
wasm_features: WasmFeatures,
profiling_strategy: ProfilingStrategy,
) -> wasmtime::Config {
use wasmtime::{Config, InstanceAllocationStrategy, WasmBacktraceDetails};
let mut config = Config::new();
config.debug_info(false); config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
config.async_support(true);
config.epoch_interruption(true);
config.profiler(profiling_strategy);
config.allocation_strategy(InstanceAllocationStrategy::OnDemand);
config.wasm_features(wasm_features, true);
config.relaxed_simd_deterministic(true);
config
}
#[pin_project]
struct CpuTimeTracking<F> {
#[pin]
future: F,
time_spent: Arc<AtomicU64>,
}
impl<F> CpuTimeTracking<F> {
fn new(time_spent: Arc<AtomicU64>, future: F) -> Self {
CpuTimeTracking { future, time_spent }
}
}
impl<E, F: Future<Output = Result<(), E>>> Future for CpuTimeTracking<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let start = Instant::now();
let result = me.future.poll(cx);
let runtime = start.elapsed().as_micros() as u64;
let _ = me.time_spent.fetch_add(runtime, Ordering::SeqCst);
result
}
}