use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
#[cfg(feature = "embedded")]
use crate::cache::{CacheKey, InstancePreCache};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use wasmtime::component::{Accessor, Component, HasSelf, Linker, ResourceTable};
use wasmtime::{Config, Engine, ResourceLimiter, Store};
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use crate::callback::Callback;
use crate::error::Error;
use crate::trace::TraceEvent;
#[derive(Debug)]
pub struct CallbackRequest {
pub name: String,
pub arguments_json: String,
pub response_tx: oneshot::Sender<std::result::Result<String, String>>,
}
#[derive(Debug, Clone)]
pub struct TraceRequest {
pub lineno: u32,
pub event_json: String,
pub context_json: String,
}
#[derive(Debug, Clone)]
pub struct OutputRequest {
pub stream: u32,
pub data: String,
}
#[derive(Debug)]
pub enum NetRequest {
TcpConnect {
host: String,
port: u16,
response_tx: oneshot::Sender<Result<u32, crate::net::TcpError>>,
},
TcpRead {
handle: u32,
len: u32,
response_tx: oneshot::Sender<Result<Vec<u8>, crate::net::TcpError>>,
},
TcpWrite {
handle: u32,
data: Vec<u8>,
response_tx: oneshot::Sender<Result<u32, crate::net::TcpError>>,
},
TcpClose {
handle: u32,
},
TlsUpgrade {
tcp_handle: u32,
hostname: String,
response_tx: oneshot::Sender<Result<u32, crate::net::TlsError>>,
},
TlsRead {
handle: u32,
len: u32,
response_tx: oneshot::Sender<Result<Vec<u8>, crate::net::TlsError>>,
},
TlsWrite {
handle: u32,
data: Vec<u8>,
response_tx: oneshot::Sender<Result<u32, crate::net::TlsError>>,
},
TlsClose {
handle: u32,
},
}
#[derive(Debug, Clone)]
pub struct HostCallbackInfo {
pub name: String,
pub description: String,
pub parameters_schema_json: String,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ExecutionOutput {
pub stdout: String,
pub stderr: String,
pub peak_memory_bytes: u64,
pub duration: Duration,
pub callback_invocations: u32,
pub fuel_consumed: Option<u64>,
}
impl ExecutionOutput {
#[must_use]
pub fn new(
stdout: String,
stderr: String,
peak_memory_bytes: u64,
duration: Duration,
callback_invocations: u32,
fuel_consumed: Option<u64>,
) -> Self {
Self {
stdout,
stderr,
peak_memory_bytes,
duration,
callback_invocations,
fuel_consumed,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CpuFeatureLevel {
X86_64,
X86_64v2,
X86_64v3,
X86_64v4,
#[default]
Native,
}
impl CpuFeatureLevel {
#[must_use]
pub fn parse(s: &str) -> Option<Self> {
match s {
"x86-64" | "x86-64-v1" => Some(Self::X86_64),
"x86-64-v2" => Some(Self::X86_64v2),
"x86-64-v3" => Some(Self::X86_64v3),
"x86-64-v4" => Some(Self::X86_64v4),
"native" => Some(Self::Native),
_ => None,
}
}
#[must_use]
pub const fn as_str(&self) -> &'static str {
match self {
Self::X86_64 => "x86-64",
Self::X86_64v2 => "x86-64-v2",
Self::X86_64v3 => "x86-64-v3",
Self::X86_64v4 => "x86-64-v4",
Self::Native => "native",
}
}
}
impl std::fmt::Display for CpuFeatureLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl std::str::FromStr for CpuFeatureLevel {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::parse(s).ok_or_else(|| {
format!(
"Unknown CPU feature level '{}'. Valid values: x86-64, x86-64-v2, x86-64-v3, x86-64-v4, native",
s
)
})
}
}
#[derive(Debug)]
pub struct MemoryTracker {
peak_memory_bytes: AtomicU64,
memory_limit: Option<u64>,
}
impl MemoryTracker {
#[must_use]
pub fn new(memory_limit: Option<u64>) -> Self {
Self {
peak_memory_bytes: AtomicU64::new(0),
memory_limit,
}
}
#[must_use]
pub fn peak_memory_bytes(&self) -> u64 {
self.peak_memory_bytes.load(Ordering::Relaxed)
}
pub fn reset(&self) {
self.peak_memory_bytes.store(0, Ordering::Relaxed);
}
}
impl ResourceLimiter for MemoryTracker {
fn memory_growing(
&mut self,
_current: usize,
desired: usize,
maximum: Option<usize>,
) -> anyhow::Result<bool> {
let desired_u64 = desired as u64;
self.peak_memory_bytes
.fetch_max(desired_u64, Ordering::Relaxed);
if self.memory_limit.is_some_and(|limit| desired_u64 > limit) {
return Ok(false);
}
if maximum.is_some_and(|max| desired > max) {
return Ok(false);
}
Ok(true)
}
fn table_growing(
&mut self,
_current: usize,
desired: usize,
maximum: Option<usize>,
) -> anyhow::Result<bool> {
if maximum.is_some_and(|max| desired > max) {
return Ok(false);
}
Ok(true)
}
}
wasmtime::component::bindgen!({
path: "wit",
imports: {
"eryx:net/tcp.connect": async,
"eryx:net/tcp.read": async,
"eryx:net/tcp.write": async,
"eryx:net/tcp.close": async,
"eryx:net/tls.upgrade": async,
"eryx:net/tls.read": async,
"eryx:net/tls.write": async,
"eryx:net/tls.close": async,
},
});
pub struct ExecutorState {
pub(crate) wasi: WasiCtx,
pub(crate) table: ResourceTable,
pub(crate) callback_tx: Option<mpsc::Sender<CallbackRequest>>,
pub(crate) trace_tx: Option<mpsc::UnboundedSender<TraceRequest>>,
pub(crate) callbacks: Vec<HostCallbackInfo>,
pub(crate) memory_tracker: MemoryTracker,
pub(crate) net_tx: Option<mpsc::Sender<NetRequest>>,
pub(crate) output_tx: Option<mpsc::UnboundedSender<OutputRequest>>,
#[cfg(feature = "vfs")]
pub(crate) hybrid_vfs_ctx: Option<eryx_vfs::HybridVfsCtx<eryx_vfs::ArcStorage>>,
}
impl std::fmt::Debug for ExecutorState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut debug = f.debug_struct("ExecutorState");
debug
.field("wasi", &"<WasiCtx>")
.field("table", &"<ResourceTable>")
.field("callback_tx", &self.callback_tx.is_some())
.field("trace_tx", &self.trace_tx.is_some())
.field("callbacks", &self.callbacks.len())
.field(
"peak_memory_bytes",
&self.memory_tracker.peak_memory_bytes(),
)
.field("net_tx", &self.net_tx.is_some())
.field("output_tx", &self.output_tx.is_some());
#[cfg(feature = "vfs")]
debug.field("hybrid_vfs_ctx", &self.hybrid_vfs_ctx.is_some());
debug.finish()
}
}
impl WasiView for ExecutorState {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.wasi,
table: &mut self.table,
}
}
}
#[cfg(feature = "vfs")]
impl eryx_vfs::HybridVfsView for ExecutorState {
type Storage = eryx_vfs::ArcStorage;
#[allow(clippy::expect_used)]
fn hybrid_vfs(&mut self) -> eryx_vfs::HybridVfsState<'_, Self::Storage> {
eryx_vfs::HybridVfsState::new(
self.hybrid_vfs_ctx
.as_mut()
.expect("Hybrid VFS not configured for this executor"),
&mut self.table,
)
}
}
impl SandboxImportsWithStore for HasSelf<ExecutorState> {
fn invoke<T>(
accessor: &Accessor<T, Self>,
name: String,
arguments_json: String,
) -> impl ::core::future::Future<Output = Result<String, String>> + Send {
tracing::debug!(
callback = %name,
args_len = arguments_json.len(),
"Python invoking callback"
);
async move {
if let Some(tx) = accessor.with(|mut access| access.get().callback_tx.clone()) {
let (response_tx, response_rx) = oneshot::channel();
let request = CallbackRequest {
name: name.clone(),
arguments_json,
response_tx,
};
if tx.send(request).await.is_err() {
Err("Callback channel closed".to_string())
} else {
response_rx
.await
.unwrap_or_else(|_| Err("Callback response channel closed".to_string()))
}
} else {
Err(format!("Callback '{name}' not available (no handler)"))
}
}
}
}
impl SandboxImports for ExecutorState {
fn list_callbacks(&mut self) -> Vec<CallbackInfo> {
self.callbacks
.iter()
.map(|cb| CallbackInfo {
name: cb.name.clone(),
description: cb.description.clone(),
parameters_schema_json: cb.parameters_schema_json.clone(),
})
.collect()
}
fn report_trace(&mut self, lineno: u32, event_json: String, context_json: String) {
if let Some(tx) = &self.trace_tx {
let request = TraceRequest {
lineno,
event_json,
context_json,
};
let _ = tx.send(request);
}
}
fn report_output(&mut self, stream_id: u32, data: String) {
if let Some(tx) = &self.output_tx {
let request = OutputRequest {
stream: stream_id,
data,
};
let _ = tx.send(request);
}
}
}
use self::eryx::net::tcp;
use self::eryx::net::tls;
fn to_wit_tcp_error(e: crate::net::TcpError) -> tcp::TcpError {
use crate::net::TcpError as E;
match e {
E::ConnectionRefused => tcp::TcpError::ConnectionRefused,
E::ConnectionReset => tcp::TcpError::ConnectionReset,
E::TimedOut => tcp::TcpError::TimedOut,
E::HostNotFound => tcp::TcpError::HostNotFound,
E::IoError(msg) => tcp::TcpError::IoError(msg),
E::NotPermitted(msg) => tcp::TcpError::NotPermitted(msg),
E::InvalidHandle => tcp::TcpError::InvalidHandle,
}
}
fn to_wit_tls_error(e: crate::net::TlsError) -> tls::TlsError {
use crate::net::TlsError as E;
match e {
E::Tcp(tcp_err) => tls::TlsError::Tcp(to_wit_tcp_error(tcp_err)),
E::HandshakeFailed(msg) => tls::TlsError::HandshakeFailed(msg),
E::CertificateError(msg) => tls::TlsError::CertificateError(msg),
E::InvalidHandle => tls::TlsError::InvalidHandle,
}
}
impl tcp::Host for ExecutorState {
async fn connect(&mut self, host: String, port: u16) -> Result<u32, tcp::TcpError> {
tracing::debug!(host = %host, port, "TCP connect requested");
let tx = self.net_tx.clone().ok_or_else(|| {
tcp::TcpError::NotPermitted("networking not enabled for this sandbox".into())
})?;
let (response_tx, response_rx) = oneshot::channel();
let request = NetRequest::TcpConnect {
host,
port,
response_tx,
};
tx.send(request)
.await
.map_err(|_| tcp::TcpError::IoError("network handler channel closed".into()))?;
response_rx
.await
.map_err(|_| tcp::TcpError::IoError("network response channel closed".into()))?
.map_err(to_wit_tcp_error)
}
async fn read(&mut self, handle: u32, len: u32) -> Result<Vec<u8>, tcp::TcpError> {
tracing::trace!(handle, len, "TCP read requested");
let tx = self.net_tx.clone().ok_or_else(|| {
tcp::TcpError::NotPermitted("networking not enabled for this sandbox".into())
})?;
let (response_tx, response_rx) = oneshot::channel();
let request = NetRequest::TcpRead {
handle,
len,
response_tx,
};
tx.send(request)
.await
.map_err(|_| tcp::TcpError::IoError("network handler channel closed".into()))?;
response_rx
.await
.map_err(|_| tcp::TcpError::IoError("network response channel closed".into()))?
.map_err(to_wit_tcp_error)
}
async fn write(&mut self, handle: u32, data: Vec<u8>) -> Result<u32, tcp::TcpError> {
tracing::trace!(handle, len = data.len(), "TCP write requested");
let tx = self.net_tx.clone().ok_or_else(|| {
tcp::TcpError::NotPermitted("networking not enabled for this sandbox".into())
})?;
let (response_tx, response_rx) = oneshot::channel();
let request = NetRequest::TcpWrite {
handle,
data,
response_tx,
};
tx.send(request)
.await
.map_err(|_| tcp::TcpError::IoError("network handler channel closed".into()))?;
response_rx
.await
.map_err(|_| tcp::TcpError::IoError("network response channel closed".into()))?
.map_err(to_wit_tcp_error)
}
async fn close(&mut self, handle: u32) {
tracing::debug!(handle, "TCP close requested");
if let Some(ref tx) = self.net_tx {
let _ = tx.send(NetRequest::TcpClose { handle }).await;
}
}
}
impl tls::Host for ExecutorState {
async fn upgrade(&mut self, tcp_handle: u32, hostname: String) -> Result<u32, tls::TlsError> {
tracing::debug!(tcp_handle, hostname = %hostname, "TLS upgrade requested");
let tx = self.net_tx.clone().ok_or_else(|| {
tls::TlsError::Tcp(tcp::TcpError::NotPermitted(
"networking not enabled for this sandbox".into(),
))
})?;
let (response_tx, response_rx) = oneshot::channel();
let request = NetRequest::TlsUpgrade {
tcp_handle,
hostname,
response_tx,
};
tx.send(request).await.map_err(|_| {
tls::TlsError::Tcp(tcp::TcpError::IoError(
"network handler channel closed".into(),
))
})?;
response_rx
.await
.map_err(|_| {
tls::TlsError::Tcp(tcp::TcpError::IoError(
"network response channel closed".into(),
))
})?
.map_err(to_wit_tls_error)
}
async fn read(&mut self, handle: u32, len: u32) -> Result<Vec<u8>, tls::TlsError> {
tracing::trace!(handle, len, "TLS read requested");
let tx = self.net_tx.clone().ok_or_else(|| {
tls::TlsError::Tcp(tcp::TcpError::NotPermitted(
"networking not enabled for this sandbox".into(),
))
})?;
let (response_tx, response_rx) = oneshot::channel();
let request = NetRequest::TlsRead {
handle,
len,
response_tx,
};
tx.send(request).await.map_err(|_| {
tls::TlsError::Tcp(tcp::TcpError::IoError(
"network handler channel closed".into(),
))
})?;
response_rx
.await
.map_err(|_| {
tls::TlsError::Tcp(tcp::TcpError::IoError(
"network response channel closed".into(),
))
})?
.map_err(to_wit_tls_error)
}
async fn write(&mut self, handle: u32, data: Vec<u8>) -> Result<u32, tls::TlsError> {
tracing::trace!(handle, len = data.len(), "TLS write requested");
let tx = self.net_tx.clone().ok_or_else(|| {
tls::TlsError::Tcp(tcp::TcpError::NotPermitted(
"networking not enabled for this sandbox".into(),
))
})?;
let (response_tx, response_rx) = oneshot::channel();
let request = NetRequest::TlsWrite {
handle,
data,
response_tx,
};
tx.send(request).await.map_err(|_| {
tls::TlsError::Tcp(tcp::TcpError::IoError(
"network handler channel closed".into(),
))
})?;
response_rx
.await
.map_err(|_| {
tls::TlsError::Tcp(tcp::TcpError::IoError(
"network response channel closed".into(),
))
})?
.map_err(to_wit_tls_error)
}
async fn close(&mut self, handle: u32) {
tracing::debug!(handle, "TLS close requested");
if let Some(ref tx) = self.net_tx {
let _ = tx.send(NetRequest::TlsClose { handle }).await;
}
}
}
pub struct ExecuteBuilder<'a> {
executor: &'a PythonExecutor,
code: String,
callbacks: Vec<Arc<dyn Callback>>,
callback_tx: Option<mpsc::Sender<CallbackRequest>>,
trace_tx: Option<mpsc::UnboundedSender<TraceRequest>>,
net_tx: Option<mpsc::Sender<NetRequest>>,
output_tx: Option<mpsc::UnboundedSender<OutputRequest>>,
memory_limit: Option<u64>,
execution_timeout: Option<Duration>,
cancellation_token: Option<CancellationToken>,
fuel_limit: Option<u64>,
#[cfg(feature = "vfs")]
vfs_storage: Option<eryx_vfs::ArcStorage>,
#[cfg(feature = "vfs")]
volumes: Vec<crate::session::VolumeMount>,
}
impl std::fmt::Debug for ExecuteBuilder<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut debug = f.debug_struct("ExecuteBuilder");
debug
.field("code_len", &self.code.len())
.field("callbacks_count", &self.callbacks.len())
.field("has_callback_tx", &self.callback_tx.is_some())
.field("has_trace_tx", &self.trace_tx.is_some())
.field("has_net_tx", &self.net_tx.is_some())
.field("has_output_tx", &self.output_tx.is_some())
.field("memory_limit", &self.memory_limit)
.field("execution_timeout", &self.execution_timeout)
.field("has_cancellation_token", &self.cancellation_token.is_some())
.field("fuel_limit", &self.fuel_limit);
#[cfg(feature = "vfs")]
debug.field("has_vfs_storage", &self.vfs_storage.is_some());
#[cfg(feature = "vfs")]
debug.field("volumes_count", &self.volumes.len());
debug.finish_non_exhaustive()
}
}
impl<'a> ExecuteBuilder<'a> {
fn new(executor: &'a PythonExecutor, code: impl Into<String>) -> Self {
Self {
executor,
code: code.into(),
callbacks: Vec::new(),
callback_tx: None,
trace_tx: None,
net_tx: None,
output_tx: None,
memory_limit: None,
execution_timeout: None,
cancellation_token: None,
fuel_limit: None,
#[cfg(feature = "vfs")]
vfs_storage: None,
#[cfg(feature = "vfs")]
volumes: Vec::new(),
}
}
#[must_use]
pub fn with_callbacks(
mut self,
callbacks: &[Arc<dyn Callback>],
callback_tx: mpsc::Sender<CallbackRequest>,
) -> Self {
self.callbacks = callbacks.to_vec();
self.callback_tx = Some(callback_tx);
self
}
#[must_use]
pub fn with_tracing(mut self, trace_tx: mpsc::UnboundedSender<TraceRequest>) -> Self {
self.trace_tx = Some(trace_tx);
self
}
#[must_use]
pub fn with_network(mut self, net_tx: mpsc::Sender<NetRequest>) -> Self {
self.net_tx = Some(net_tx);
self
}
#[must_use]
pub fn with_output_streaming(
mut self,
output_tx: mpsc::UnboundedSender<OutputRequest>,
) -> Self {
self.output_tx = Some(output_tx);
self
}
#[must_use]
pub fn with_memory_limit(mut self, limit: u64) -> Self {
self.memory_limit = Some(limit);
self
}
#[must_use]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.execution_timeout = Some(timeout);
self
}
#[must_use]
pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
self.cancellation_token = Some(token);
self
}
#[must_use]
pub fn with_fuel_limit(mut self, fuel: u64) -> Self {
self.fuel_limit = Some(fuel);
self
}
#[cfg(feature = "vfs")]
#[must_use]
pub fn with_vfs_storage(mut self, storage: eryx_vfs::ArcStorage) -> Self {
self.vfs_storage = Some(storage);
self
}
#[cfg(feature = "vfs")]
#[must_use]
pub fn with_volumes(mut self, volumes: Vec<crate::session::VolumeMount>) -> Self {
self.volumes = volumes;
self
}
pub async fn run(self) -> std::result::Result<ExecutionOutput, Error> {
self.executor
.execute_internal(
&self.code,
&self.callbacks,
self.callback_tx,
self.trace_tx,
self.net_tx,
self.output_tx,
self.memory_limit,
self.execution_timeout,
self.cancellation_token,
self.fuel_limit,
#[cfg(feature = "vfs")]
self.vfs_storage,
#[cfg(feature = "vfs")]
self.volumes,
)
.await
}
}
pub struct PythonExecutor {
engine: Engine,
instance_pre: SandboxPre<ExecutorState>,
python_stdlib_path: Option<PathBuf>,
python_site_packages_paths: Vec<PathBuf>,
}
impl std::fmt::Debug for PythonExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PythonExecutor")
.field("engine", &"<wasmtime::Engine>")
.field("instance_pre", &"<SandboxPre>")
.field("python_stdlib_path", &self.python_stdlib_path)
.field(
"python_site_packages_paths",
&self.python_site_packages_paths,
)
.finish_non_exhaustive()
}
}
impl PythonExecutor {
#[must_use]
pub fn engine(&self) -> &Engine {
&self.engine
}
#[must_use]
pub fn instance_pre(&self) -> &SandboxPre<ExecutorState> {
&self.instance_pre
}
#[must_use]
pub fn python_stdlib_path(&self) -> Option<&PathBuf> {
self.python_stdlib_path.as_ref()
}
#[must_use]
pub fn python_site_packages_paths(&self) -> &[PathBuf] {
&self.python_site_packages_paths
}
#[must_use]
pub fn with_python_stdlib(mut self, path: impl Into<PathBuf>) -> Self {
self.python_stdlib_path = Some(path.into());
self
}
#[must_use]
pub fn with_site_packages(mut self, path: impl Into<PathBuf>) -> Self {
self.python_site_packages_paths.push(path.into());
self
}
pub fn shared_engine() -> std::result::Result<Engine, Error> {
use std::sync::OnceLock;
static SHARED_ENGINE: OnceLock<Engine> = OnceLock::new();
if let Some(engine) = SHARED_ENGINE.get() {
return Ok(engine.clone());
}
let engine = Self::create_engine()?;
Ok(SHARED_ENGINE.get_or_init(|| engine).clone())
}
#[tracing::instrument(
name = "PythonExecutor::from_binary",
skip(wasm_bytes),
fields(wasm_bytes_len = wasm_bytes.len())
)]
pub fn from_binary(wasm_bytes: &[u8]) -> std::result::Result<Self, Error> {
let engine = Self::shared_engine()?;
let component =
Component::from_binary(&engine, wasm_bytes).map_err(Error::WasmComponent)?;
let instance_pre = Self::create_instance_pre(&engine, &component)?;
Ok(Self {
engine,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
})
}
#[tracing::instrument(
name = "PythonExecutor::from_file",
fields(path = %path.as_ref().display())
)]
pub fn from_file(path: impl AsRef<std::path::Path>) -> std::result::Result<Self, Error> {
let engine = Self::shared_engine()?;
let component =
Component::from_file(&engine, path.as_ref()).map_err(Error::WasmComponent)?;
let instance_pre = Self::create_instance_pre(&engine, &component)?;
Ok(Self {
engine,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
})
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
#[allow(unsafe_code)]
pub unsafe fn from_precompiled(precompiled_bytes: &[u8]) -> std::result::Result<Self, Error> {
let engine = Self::shared_engine()?;
let component = unsafe { Component::deserialize(&engine, precompiled_bytes) }
.map_err(Error::WasmComponent)?;
let instance_pre = Self::create_instance_pre(&engine, &component)?;
Ok(Self {
engine,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
})
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
#[allow(unsafe_code)]
pub unsafe fn from_precompiled_file(
path: impl AsRef<std::path::Path>,
) -> std::result::Result<Self, Error> {
#[cfg(feature = "embedded")]
{
#[allow(unsafe_code)]
unsafe {
Self::from_precompiled_file_internal(path.as_ref(), None)
}
}
#[cfg(not(feature = "embedded"))]
{
let engine = Self::shared_engine()?;
#[allow(unsafe_code)]
let component = unsafe { Component::deserialize_file(&engine, path.as_ref()) }
.map_err(Error::WasmComponent)?;
let instance_pre = Self::create_instance_pre(&engine, &component)?;
Ok(Self {
engine,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
})
}
}
#[cfg(feature = "embedded")]
#[allow(unsafe_code)]
pub(crate) unsafe fn from_precompiled_file_with_key(
path: impl AsRef<std::path::Path>,
cache_key: CacheKey,
) -> std::result::Result<Self, Error> {
#[allow(unsafe_code)]
unsafe {
Self::from_precompiled_file_internal(path.as_ref(), Some(cache_key))
}
}
#[cfg(feature = "embedded")]
#[allow(unsafe_code)]
unsafe fn from_precompiled_file_internal(
path: &std::path::Path,
cache_key: Option<CacheKey>,
) -> std::result::Result<Self, Error> {
let engine = Self::shared_engine()?;
if let Some(ref key) = cache_key
&& let Some(instance_pre) = InstancePreCache::global().get(key)
{
return Ok(Self {
engine,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
});
}
#[allow(unsafe_code)]
let component =
unsafe { Component::deserialize_file(&engine, path) }.map_err(Error::WasmComponent)?;
let instance_pre = Self::create_instance_pre(&engine, &component)?;
if let Some(key) = cache_key {
InstancePreCache::global().put(key, instance_pre.clone());
}
Ok(Self {
engine,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
})
}
#[cfg(feature = "embedded")]
#[tracing::instrument(name = "PythonExecutor::from_embedded_runtime")]
pub fn from_embedded_runtime() -> std::result::Result<Self, Error> {
let cache_key = CacheKey::embedded_runtime();
let resources = crate::embedded::EmbeddedResources::get()?;
let stdlib_path = Some(resources.stdlib_path.clone());
if let Some(instance_pre) = InstancePreCache::global().get(&cache_key) {
return Ok(Self {
engine: Self::shared_engine()?,
instance_pre,
python_stdlib_path: stdlib_path,
python_site_packages_paths: Vec::new(),
});
}
#[allow(unsafe_code)]
let mut executor =
unsafe { Self::from_precompiled_file_with_key(resources.runtime(), cache_key)? };
executor.python_stdlib_path = stdlib_path;
Ok(executor)
}
#[cfg(all(feature = "embedded", feature = "native-extensions"))]
pub(crate) fn from_cached_instance_pre(
instance_pre: SandboxPre<ExecutorState>,
) -> std::result::Result<Self, Error> {
Ok(Self {
engine: Self::shared_engine()?,
instance_pre,
python_stdlib_path: None,
python_site_packages_paths: Vec::new(),
})
}
pub fn precompile(wasm_bytes: &[u8]) -> std::result::Result<Vec<u8>, Error> {
let engine = Self::create_engine()?;
engine
.precompile_component(wasm_bytes)
.map_err(|e| Error::WasmEngine(format!("Failed to precompile component: {e}")))
}
pub fn precompile_file(
path: impl AsRef<std::path::Path>,
) -> std::result::Result<Vec<u8>, Error> {
let wasm_bytes = std::fs::read(path.as_ref())
.map_err(|e| Error::WasmEngine(format!("Failed to read WASM file: {e}")))?;
Self::precompile(&wasm_bytes)
}
pub fn precompile_with_target(
wasm_bytes: &[u8],
target: Option<&str>,
) -> std::result::Result<Vec<u8>, Error> {
let engine = Self::create_engine_with_target(target)?;
engine
.precompile_component(wasm_bytes)
.map_err(|e| Error::WasmEngine(format!("Failed to precompile component: {e}")))
}
pub fn precompile_file_with_target(
path: impl AsRef<std::path::Path>,
target: Option<&str>,
) -> std::result::Result<Vec<u8>, Error> {
let wasm_bytes = std::fs::read(path.as_ref())
.map_err(|e| Error::WasmEngine(format!("Failed to read WASM file: {e}")))?;
Self::precompile_with_target(&wasm_bytes, target)
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
pub fn precompile_with_options(
wasm_bytes: &[u8],
target: Option<&str>,
cpu_features: CpuFeatureLevel,
) -> std::result::Result<Vec<u8>, Error> {
let engine = Self::create_engine_with_options(target, cpu_features)?;
engine
.precompile_component(wasm_bytes)
.map_err(|e| Error::WasmEngine(format!("Failed to precompile component: {e}")))
}
pub const ENGINE_CONFIG_VERSION: u32 = 3;
fn create_engine() -> std::result::Result<Engine, Error> {
Self::create_engine_with_target(None)
}
fn create_engine_with_target(target: Option<&str>) -> std::result::Result<Engine, Error> {
let mut config = Config::new();
config.wasm_component_model(true);
config.wasm_component_model_async(true);
config.async_support(true);
config.epoch_interruption(true);
config.consume_fuel(true);
config.memory_init_cow(true);
config.cranelift_opt_level(wasmtime::OptLevel::SpeedAndSize);
config.async_stack_size(512 * 1024);
let effective_target = target
.map(|s| s.to_string())
.or_else(|| std::env::var("ERYX_TARGET").ok());
if let Some(ref target_str) = effective_target {
config
.target(target_str)
.map_err(|e| Error::WasmEngine(format!("Invalid target '{target_str}': {e}")))?;
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
Self::apply_cpu_feature_flags(&mut config)?;
Engine::new(&config).map_err(|e| Error::WasmEngine(e.to_string()))
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
fn create_engine_with_options(
target: Option<&str>,
cpu_features: CpuFeatureLevel,
) -> std::result::Result<Engine, Error> {
let mut config = Config::new();
config.wasm_component_model(true);
config.wasm_component_model_async(true);
config.async_support(true);
config.epoch_interruption(true);
config.consume_fuel(true);
config.memory_init_cow(true);
config.cranelift_opt_level(wasmtime::OptLevel::SpeedAndSize);
config.async_stack_size(512 * 1024);
if let Some(target_str) = target {
config
.target(target_str)
.map_err(|e| Error::WasmEngine(format!("Invalid target '{target_str}': {e}")))?;
}
Self::apply_cpu_feature_level(&mut config, cpu_features)?;
Engine::new(&config).map_err(|e| Error::WasmEngine(e.to_string()))
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
#[allow(unsafe_code)]
fn apply_cpu_feature_level(
config: &mut Config,
level: CpuFeatureLevel,
) -> std::result::Result<(), Error> {
const AVX512_FLAGS: &[&str] = &[
"has_avx512bitalg",
"has_avx512dq",
"has_avx512f",
"has_avx512vbmi",
"has_avx512vl",
];
match level {
CpuFeatureLevel::X86_64 => {
unsafe {
config.cranelift_flag_set("has_sse3", "false");
config.cranelift_flag_set("has_ssse3", "false");
config.cranelift_flag_set("has_sse41", "false");
config.cranelift_flag_set("has_sse42", "false");
config.cranelift_flag_set("has_avx", "false");
config.cranelift_flag_set("has_avx2", "false");
config.cranelift_flag_set("has_fma", "false");
config.cranelift_flag_set("has_bmi1", "false");
config.cranelift_flag_set("has_bmi2", "false");
config.cranelift_flag_set("has_lzcnt", "false");
config.cranelift_flag_set("has_popcnt", "false");
for flag in AVX512_FLAGS {
config.cranelift_flag_set(flag, "false");
}
}
}
CpuFeatureLevel::X86_64v2 => {
unsafe {
config.cranelift_flag_set("has_avx", "false");
config.cranelift_flag_set("has_avx2", "false");
config.cranelift_flag_set("has_fma", "false");
config.cranelift_flag_set("has_bmi1", "false");
config.cranelift_flag_set("has_bmi2", "false");
for flag in AVX512_FLAGS {
config.cranelift_flag_set(flag, "false");
}
}
}
CpuFeatureLevel::X86_64v3 => {
unsafe {
for flag in AVX512_FLAGS {
config.cranelift_flag_set(flag, "false");
}
}
}
CpuFeatureLevel::X86_64v4 | CpuFeatureLevel::Native => {
}
}
Ok(())
}
#[cfg(any(feature = "embedded", feature = "preinit"))]
#[allow(unsafe_code)]
fn apply_cpu_feature_flags(config: &mut Config) -> std::result::Result<(), Error> {
if let Ok(level) = std::env::var("ERYX_CPU_FEATURES") {
const AVX512_FLAGS: &[&str] = &[
"has_avx512bitalg",
"has_avx512dq",
"has_avx512f",
"has_avx512vbmi",
"has_avx512vl",
];
match level.as_str() {
"x86-64" | "x86-64-v1" => {
unsafe {
config.cranelift_flag_set("has_sse3", "false");
config.cranelift_flag_set("has_ssse3", "false");
config.cranelift_flag_set("has_sse41", "false");
config.cranelift_flag_set("has_sse42", "false");
config.cranelift_flag_set("has_avx", "false");
config.cranelift_flag_set("has_avx2", "false");
config.cranelift_flag_set("has_fma", "false");
config.cranelift_flag_set("has_bmi1", "false");
config.cranelift_flag_set("has_bmi2", "false");
config.cranelift_flag_set("has_lzcnt", "false");
config.cranelift_flag_set("has_popcnt", "false");
for flag in AVX512_FLAGS {
config.cranelift_flag_set(flag, "false");
}
}
}
"x86-64-v2" => {
unsafe {
config.cranelift_flag_set("has_avx", "false");
config.cranelift_flag_set("has_avx2", "false");
config.cranelift_flag_set("has_fma", "false");
config.cranelift_flag_set("has_bmi1", "false");
config.cranelift_flag_set("has_bmi2", "false");
for flag in AVX512_FLAGS {
config.cranelift_flag_set(flag, "false");
}
}
}
"x86-64-v3" => {
unsafe {
for flag in AVX512_FLAGS {
config.cranelift_flag_set(flag, "false");
}
}
}
"x86-64-v4" | "native" => {
}
other => {
return Err(Error::WasmEngine(format!(
"Unknown CPU feature level '{other}'. Valid values: \
x86-64, x86-64-v2, x86-64-v3, x86-64-v4, native"
)));
}
}
}
if let Ok(flags) = std::env::var("ERYX_CRANELIFT_FLAGS") {
for flag_spec in flags.split(',') {
let flag_spec = flag_spec.trim();
if flag_spec.is_empty() {
continue;
}
unsafe {
if let Some((flag, value)) = flag_spec.split_once('=') {
config.cranelift_flag_set(flag.trim(), value.trim());
} else {
config.cranelift_flag_enable(flag_spec);
}
}
}
}
Ok(())
}
#[tracing::instrument(name = "PythonExecutor::create_instance_pre", skip(engine, component))]
fn create_instance_pre(
engine: &Engine,
component: &Component,
) -> std::result::Result<SandboxPre<ExecutorState>, Error> {
let mut linker = Linker::<ExecutorState>::new(engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker)
.map_err(|e| Error::WasmEngine(format!("Failed to add WASI to linker: {e}")))?;
#[cfg(feature = "vfs")]
{
linker.allow_shadowing(true);
eryx_vfs::add_hybrid_vfs_to_linker(&mut linker).map_err(|e| {
Error::WasmEngine(format!("Failed to add hybrid VFS to linker: {e}"))
})?;
linker.allow_shadowing(false);
}
tracing::debug!("Adding sandbox bindings to linker");
Sandbox::add_to_linker::<_, HasSelf<ExecutorState>>(&mut linker, |state| state)
.map_err(|e| Error::WasmEngine(format!("Failed to add sandbox to linker: {e}")))?;
tracing::debug!("Sandbox bindings added successfully");
let instance_pre = linker
.instantiate_pre(component)
.map_err(|e| Error::WasmEngine(format!("Failed to create instance_pre: {e}")))?;
SandboxPre::new(instance_pre)
.map_err(|e| Error::WasmEngine(format!("Failed to create SandboxPre: {e}")))
}
#[must_use]
pub fn execute(&self, code: impl Into<String>) -> ExecuteBuilder<'_> {
ExecuteBuilder::new(self, code)
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "PythonExecutor::execute_internal",
skip_all,
fields(
code_len = code.len(),
callbacks = callbacks.len(),
memory_limit = ?memory_limit,
timeout = ?execution_timeout,
fuel_limit = ?fuel_limit,
)
)]
async fn execute_internal(
&self,
code: &str,
callbacks: &[Arc<dyn Callback>],
callback_tx: Option<mpsc::Sender<CallbackRequest>>,
trace_tx: Option<mpsc::UnboundedSender<TraceRequest>>,
net_tx: Option<mpsc::Sender<NetRequest>>,
output_tx: Option<mpsc::UnboundedSender<OutputRequest>>,
memory_limit: Option<u64>,
execution_timeout: Option<Duration>,
cancellation_token: Option<CancellationToken>,
fuel_limit: Option<u64>,
#[cfg(feature = "vfs")] vfs_storage: Option<eryx_vfs::ArcStorage>,
#[cfg(feature = "vfs")] volumes: Vec<crate::session::VolumeMount>,
) -> std::result::Result<ExecutionOutput, Error> {
let callback_infos: Vec<HostCallbackInfo> = callbacks
.iter()
.map(|cb| HostCallbackInfo {
name: cb.name().to_string(),
description: cb.description().to_string(),
parameters_schema_json: serde_json::to_string(&cb.parameters_schema())
.unwrap_or_else(|_| "{}".to_string()),
})
.collect();
let mut wasi_builder = WasiCtxBuilder::new();
wasi_builder.inherit_stdout().inherit_stderr();
let mut pythonpath_parts = Vec::new();
if self.python_stdlib_path.is_some() {
pythonpath_parts.push("/python-stdlib".to_string());
}
for i in 0..self.python_site_packages_paths.len() {
if i == 0 {
pythonpath_parts.push("/site-packages".to_string());
} else {
pythonpath_parts.push(format!("/site-packages-{i}"));
}
}
if let Some(ref stdlib_path) = self.python_stdlib_path {
wasi_builder.env("PYTHONHOME", "/python-stdlib");
wasi_builder
.preopened_dir(
stdlib_path,
"/python-stdlib",
DirPerms::READ,
FilePerms::READ,
)
.map_err(|e| {
Error::Initialization(format!("Failed to mount Python stdlib: {e}"))
})?;
}
if !pythonpath_parts.is_empty() {
wasi_builder.env("PYTHONPATH", pythonpath_parts.join(":"));
}
for (i, site_packages_path) in self.python_site_packages_paths.iter().enumerate() {
let mount_path = if i == 0 {
"/site-packages".to_string()
} else {
format!("/site-packages-{i}")
};
wasi_builder
.preopened_dir(
site_packages_path,
&mount_path,
DirPerms::READ,
FilePerms::READ,
)
.map_err(|e| Error::Initialization(format!("Failed to mount {mount_path}: {e}")))?;
}
let wasi = wasi_builder.build();
#[cfg(feature = "vfs")]
let hybrid_vfs_ctx = {
let storage = vfs_storage.unwrap_or_else(|| {
eryx_vfs::ArcStorage::new(std::sync::Arc::new(eryx_vfs::InMemoryStorage::new()))
});
let mut ctx = eryx_vfs::HybridVfsCtx::new(storage);
ctx.add_vfs_preopen(
"/data",
eryx_vfs::DirPerms::all(),
eryx_vfs::FilePerms::all(),
);
if let Some(ref stdlib_path) = self.python_stdlib_path
&& let Err(e) = ctx.add_real_preopen_path(
"/python-stdlib",
stdlib_path,
eryx_vfs::DirPerms::READ,
eryx_vfs::FilePerms::READ,
)
{
tracing::warn!("Failed to add Python stdlib to hybrid VFS: {e}");
}
for (i, site_packages_path) in self.python_site_packages_paths.iter().enumerate() {
let mount_path = if i == 0 {
"/site-packages".to_string()
} else {
format!("/site-packages-{i}")
};
if let Err(e) = ctx.add_real_preopen_path(
&mount_path,
site_packages_path,
eryx_vfs::DirPerms::READ,
eryx_vfs::FilePerms::READ,
) {
tracing::warn!("Failed to add {mount_path} to hybrid VFS: {e}");
}
}
for volume in &volumes {
let (dir_perms, file_perms) = if volume.read_only {
(eryx_vfs::DirPerms::READ, eryx_vfs::FilePerms::READ)
} else {
(eryx_vfs::DirPerms::all(), eryx_vfs::FilePerms::all())
};
let result = if volume.host_path.is_file() {
ctx.add_real_file_preopen_path(
&volume.guest_path,
&volume.host_path,
dir_perms,
file_perms,
)
} else {
ctx.add_real_preopen_path(
&volume.guest_path,
&volume.host_path,
dir_perms,
file_perms,
)
};
result.map_err(|e| {
Error::WasmEngine(format!(
"Failed to mount volume {} -> {}: {e}",
volume.host_path.display(),
volume.guest_path,
))
})?;
}
Some(ctx)
};
let state = ExecutorState {
wasi,
table: ResourceTable::new(),
callback_tx,
trace_tx,
callbacks: callback_infos,
memory_tracker: MemoryTracker::new(memory_limit),
net_tx,
output_tx,
#[cfg(feature = "vfs")]
hybrid_vfs_ctx,
};
let mut store = Store::new(&self.engine, state);
store.limiter(|state| &mut state.memory_tracker);
store.set_epoch_deadline(u64::MAX / 2);
let initial_fuel = fuel_limit.unwrap_or(u64::MAX);
store
.set_fuel(initial_fuel)
.map_err(|e| Error::Initialization(format!("Failed to set fuel: {e}")))?;
let bindings = self
.instance_pre
.instantiate_async(&mut store)
.await
.map_err(Error::WasmComponent)?;
tracing::debug!(code_len = code.len(), "Executing Python code");
const EPOCH_TICK_MS: u64 = 10;
let was_cancelled = Arc::new(AtomicBool::new(false));
let epoch_ticker = if execution_timeout.is_some() || cancellation_token.is_some() {
if let Some(timeout) = execution_timeout {
let ticks_until_timeout = timeout.as_millis() as u64 / EPOCH_TICK_MS;
let ticks = ticks_until_timeout.max(1);
store.set_epoch_deadline(ticks);
} else {
const CANCELLATION_DEADLINE: u64 = 10000;
store.set_epoch_deadline(CANCELLATION_DEADLINE);
}
store.epoch_deadline_trap();
let engine = self.engine.clone();
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_flag_clone = Arc::clone(&stop_flag);
let was_cancelled_clone = Arc::clone(&was_cancelled);
let cancel_token = cancellation_token.clone();
std::thread::spawn(move || {
while !stop_flag_clone.load(Ordering::Relaxed) {
if let Some(ref token) = cancel_token
&& token.is_cancelled()
{
was_cancelled_clone.store(true, Ordering::Relaxed);
for _ in 0..10001 {
engine.increment_epoch();
}
break;
}
std::thread::sleep(Duration::from_millis(EPOCH_TICK_MS));
engine.increment_epoch();
}
});
Some(stop_flag)
} else {
store.set_epoch_deadline(u64::MAX / 2);
store.epoch_deadline_trap();
None::<Arc<AtomicBool>>
};
let code_owned = code.to_string();
let mut async_timeout_elapsed = false;
let wasmtime_result = if let Some(timeout) = execution_timeout {
match tokio::time::timeout(
timeout,
store.run_concurrent(async |accessor| {
bindings.call_execute(accessor, code_owned).await
}),
)
.await
{
Ok(result) => result,
Err(_elapsed) => {
async_timeout_elapsed = true;
Err(wasmtime::Error::msg("async timeout elapsed"))
}
}
} else {
store
.run_concurrent(async |accessor| bindings.call_execute(accessor, code_owned).await)
.await
};
if let Some(stop_flag) = epoch_ticker {
stop_flag.store(true, Ordering::Relaxed);
}
let wasmtime_result = wasmtime_result.map_err(|e| {
if async_timeout_elapsed
|| e.downcast_ref::<wasmtime::Trap>() == Some(&wasmtime::Trap::Interrupt)
{
if was_cancelled.load(Ordering::Relaxed) {
Error::Cancelled
} else {
Error::Timeout(execution_timeout.unwrap_or_default())
}
} else if e.downcast_ref::<wasmtime::Trap>() == Some(&wasmtime::Trap::OutOfFuel) {
let remaining = store.get_fuel().unwrap_or(0);
let consumed = initial_fuel.saturating_sub(remaining);
let limit = fuel_limit.unwrap_or(u64::MAX);
Error::FuelExhausted { consumed, limit }
} else {
Error::Execution(format!("WASM execution error: {e:?}"))
}
})?;
let wit_output = wasmtime_result
.map_err(|e| Error::Execution(format!("WASM execution error: {e:?}")))?
.map_err(Error::Execution)?;
let peak_memory_bytes = store.data().memory_tracker.peak_memory_bytes();
let remaining_fuel = store.get_fuel().unwrap_or(0);
let fuel_consumed = Some(initial_fuel.saturating_sub(remaining_fuel));
Ok(ExecutionOutput::new(
wit_output.stdout,
wit_output.stderr,
peak_memory_bytes,
Duration::ZERO, 0, fuel_consumed,
))
}
}
pub fn parse_trace_event(request: &TraceRequest) -> std::result::Result<TraceEvent, Error> {
let event_data: serde_json::Value = serde_json::from_str(&request.event_json)
.map_err(|e| Error::Serialization(e.to_string()))?;
let event_type = event_data
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let context: Option<serde_json::Value> = if request.context_json.is_empty() {
None
} else {
serde_json::from_str(&request.context_json).ok()
};
let kind = match event_type {
"line" => crate::trace::TraceEventKind::Line,
"call" => {
let function = event_data
.get("function")
.and_then(|v| v.as_str())
.unwrap_or("<unknown>")
.to_string();
crate::trace::TraceEventKind::Call { function }
}
"return" => {
let function = event_data
.get("function")
.and_then(|v| v.as_str())
.unwrap_or("<unknown>")
.to_string();
crate::trace::TraceEventKind::Return { function }
}
"exception" => {
let message = event_data
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
crate::trace::TraceEventKind::Exception { message }
}
"callback_start" => {
let name = event_data
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("<unknown>")
.to_string();
crate::trace::TraceEventKind::CallbackStart { name }
}
"callback_end" => {
let name = event_data
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("<unknown>")
.to_string();
crate::trace::TraceEventKind::CallbackEnd {
name,
duration_ms: 0,
}
}
_ => crate::trace::TraceEventKind::Line,
};
Ok(TraceEvent {
lineno: request.lineno,
event: kind,
context,
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_parse_trace_event_line() {
let request = TraceRequest {
lineno: 42,
event_json: r#"{"type": "line"}"#.to_string(),
context_json: String::new(),
};
let event = parse_trace_event(&request).unwrap();
assert_eq!(event.lineno, 42);
assert!(matches!(event.event, crate::trace::TraceEventKind::Line));
}
#[test]
fn test_parse_trace_event_call() {
let request = TraceRequest {
lineno: 10,
event_json: r#"{"type": "call", "function": "my_func"}"#.to_string(),
context_json: String::new(),
};
let event = parse_trace_event(&request).unwrap();
assert_eq!(event.lineno, 10);
if let crate::trace::TraceEventKind::Call { function } = &event.event {
assert_eq!(function, "my_func");
} else {
panic!("Expected Call event");
}
}
#[test]
fn test_parse_trace_event_callback() {
let request = TraceRequest {
lineno: 0,
event_json: r#"{"type": "callback_start", "name": "http.get"}"#.to_string(),
context_json: r#"{"url": "https://example.com"}"#.to_string(),
};
let event = parse_trace_event(&request).unwrap();
assert!(event.context.is_some());
if let crate::trace::TraceEventKind::CallbackStart { name } = &event.event {
assert_eq!(name, "http.get");
} else {
panic!("Expected CallbackStart event");
}
}
#[test]
#[cfg(any(feature = "embedded", feature = "preinit"))]
#[allow(unsafe_code)]
fn test_cpu_feature_levels_use_valid_cranelift_flags() {
use std::env;
for level in [
"x86-64",
"x86-64-v1",
"x86-64-v2",
"x86-64-v3",
"x86-64-v4",
"native",
] {
unsafe {
env::set_var("ERYX_CPU_FEATURES", level);
env::remove_var("ERYX_CRANELIFT_FLAGS");
env::remove_var("ERYX_TARGET");
}
let result = PythonExecutor::create_engine_with_target(None);
assert!(
result.is_ok(),
"CPU feature level '{level}' failed to create engine: {:?}",
result.err()
);
}
unsafe {
env::remove_var("ERYX_CPU_FEATURES");
}
}
}