use afterburner_core::{
AfterburnerError, BurnCache, BurnCacheBackend, Combustor, FuelGauge, HostContext,
InMemoryStateStore, Manifold, Result, ScriptId, ScriptInvocation, ScriptOutcome,
SharedStateStore,
};
use serde_json::Value;
use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Mode {
Native,
#[cfg(feature = "wasm")]
Wasm,
#[cfg(feature = "adaptive")]
Adaptive,
}
#[allow(clippy::derivable_impls)]
impl Default for Mode {
fn default() -> Self {
#[cfg(feature = "adaptive")]
{
Mode::Adaptive
}
#[cfg(all(feature = "wasm", not(feature = "adaptive")))]
{
Mode::Wasm
}
#[cfg(all(not(feature = "wasm"), not(feature = "adaptive"), feature = "native"))]
{
Mode::Native
}
#[cfg(all(
not(feature = "wasm"),
not(feature = "adaptive"),
not(feature = "native")
))]
compile_error!("afterburner requires at least one of the features: wasm, native, adaptive");
}
}
enum EngineHolder {
Cache(BurnCache),
#[cfg(feature = "thrust")]
Thrust(Arc<afterburner_thrust::ThrustEngine>),
}
impl fmt::Debug for EngineHolder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
EngineHolder::Cache(_) => f.debug_tuple("Cache").finish(),
#[cfg(feature = "thrust")]
EngineHolder::Thrust(_) => f.debug_tuple("Thrust").finish(),
}
}
}
pub struct Afterburner {
engine: EngineHolder,
defaults: FuelGauge,
_state_store: SharedStateStore,
cwd: Option<PathBuf>,
#[cfg(feature = "flow")]
flow: Option<afterburner_flow::FlowEngine>,
}
impl fmt::Debug for Afterburner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Afterburner")
.field("engine", &self.engine)
.field("defaults", &self.defaults)
.finish_non_exhaustive()
}
}
impl Afterburner {
pub fn new() -> Result<Self> {
Self::builder().build()
}
pub fn builder() -> AfterburnerBuilder {
AfterburnerBuilder::default()
}
pub fn register(&self, source: &str) -> Result<ScriptId> {
let wrapped: String;
let effective: &str = match self.cwd_prelude() {
Some(prelude) => {
wrapped = format!("{prelude}{source}");
wrapped.as_str()
}
None => source,
};
match &self.engine {
EngineHolder::Cache(c) => c.register(effective),
#[cfg(feature = "thrust")]
EngineHolder::Thrust(t) => t.register(effective),
}
}
fn cwd_prelude(&self) -> Option<String> {
let cwd = self.cwd.as_ref()?;
let cwd_str = cwd.to_str()?;
let cwd_json = serde_json::to_string(cwd_str).ok()?;
Some(format!(
"globalThis.__host_cwd = {cwd_json};\n\
if (typeof globalThis.__plenum_refresh_entry_require === 'function') {{\n\
globalThis.__plenum_refresh_entry_require();\n\
require = globalThis.require;\n\
}}\n"
))
}
#[cfg(feature = "flow")]
pub fn register_bundle(&self, entry: &str, modules: &[(String, String)]) -> Result<ScriptId> {
match self.flow.as_ref() {
Some(f) => f.load_bundle(entry, modules),
None => Err(AfterburnerError::Engine(
"register_bundle requires flow mode; call .flow() on the builder".into(),
)),
}
}
pub fn run(&self, id: &ScriptId, input: &Value) -> Result<Value> {
self.run_with(id, input, &self.defaults)
}
pub fn run_with(&self, id: &ScriptId, input: &Value, limits: &FuelGauge) -> Result<Value> {
match &self.engine {
EngineHolder::Cache(c) => c.execute(id, input, limits),
#[cfg(feature = "thrust")]
EngineHolder::Thrust(t) => t.thrust_sync(id, input.clone(), limits.clone(), None),
}
}
pub fn run_batch(&self, id: &ScriptId, input: &Value) -> Result<Value> {
match &self.engine {
EngineHolder::Cache(c) => c.execute_batch(id, input, &self.defaults),
#[cfg(feature = "thrust")]
EngineHolder::Thrust(_) => {
let arr = input.as_array().ok_or_else(|| {
AfterburnerError::Host("run_batch: input must be array".into())
})?;
let mut out = Vec::with_capacity(arr.len());
for item in arr {
out.push(self.run_with(id, item, &self.defaults)?);
}
Ok(Value::Array(out))
}
}
}
pub fn run_script(&self, source: &str) -> Result<ScriptOutcome> {
self.run_script_with(source, &ScriptInvocation::default(), &self.defaults)
}
pub fn run_script_with(
&self,
source: &str,
invocation: &ScriptInvocation,
limits: &FuelGauge,
) -> Result<ScriptOutcome> {
match &self.engine {
EngineHolder::Cache(c) => c.run_script(source, invocation, limits),
#[cfg(feature = "thrust")]
EngineHolder::Thrust(_) => Err(AfterburnerError::Engine(
"run_script requires a single-threaded engine; \
construct `Afterburner::builder()` without .threaded()"
.into(),
)),
}
}
pub fn unload(&self, id: &ScriptId) {
match &self.engine {
EngineHolder::Cache(c) => c.forget(id),
#[cfg(feature = "thrust")]
EngineHolder::Thrust(_) => {
}
}
}
pub fn default_limits(&self) -> &FuelGauge {
&self.defaults
}
}
#[derive(Default)]
pub struct AfterburnerBuilder {
mode: Option<Mode>,
fuel: Option<u64>,
memory_bytes: Option<usize>,
timeout_ms: Option<u64>,
manifold: Option<Manifold>,
host_context: Option<Arc<dyn HostContext>>,
state_store: Option<SharedStateStore>,
cache_backend: Option<Arc<dyn BurnCacheBackend>>,
cwd: Option<PathBuf>,
#[cfg(feature = "flow")]
use_flow: bool,
}
impl fmt::Debug for AfterburnerBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AfterburnerBuilder")
.field("mode", &self.mode)
.field("fuel", &self.fuel)
.field("memory_bytes", &self.memory_bytes)
.field("timeout_ms", &self.timeout_ms)
.field("manifold", &self.manifold)
.field("host_context", &self.host_context.is_some())
.field("state_store", &self.state_store.is_some())
.field("cache_backend", &self.cache_backend.is_some())
.finish_non_exhaustive()
}
}
impl AfterburnerBuilder {
pub fn mode(mut self, mode: Mode) -> Self {
self.mode = Some(mode);
self
}
pub fn fuel(mut self, fuel: u64) -> Self {
self.fuel = Some(fuel);
self
}
pub fn memory_bytes(mut self, bytes: usize) -> Self {
self.memory_bytes = Some(bytes);
self
}
pub fn timeout_ms(mut self, ms: u64) -> Self {
self.timeout_ms = Some(ms);
self
}
pub fn manifold(mut self, m: Manifold) -> Self {
self.manifold = Some(m);
self
}
pub fn host_context(mut self, ctx: Arc<dyn HostContext>) -> Self {
self.host_context = Some(ctx);
self
}
pub fn state_store(mut self, store: SharedStateStore) -> Self {
self.state_store = Some(store);
self
}
pub fn cache_backend(mut self, backend: Arc<dyn BurnCacheBackend>) -> Self {
self.cache_backend = Some(backend);
self
}
pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
self.cwd = Some(path.into());
self
}
#[cfg(feature = "thrust")]
pub fn threaded(self, workers: usize) -> ThreadedBuilder {
ThreadedBuilder {
parent: self,
workers,
io_workers: 0,
tokens_per_sec: None,
burst_tokens: 0,
local_queue_capacity: 0,
injector_capacity: 0,
shutdown_drain_deadline: Duration::from_secs(5),
}
}
#[cfg(feature = "flow")]
pub fn flow(mut self) -> Self {
self.use_flow = true;
let flow_defaults = afterburner_flow::default_fuel_gauge();
if self.fuel.is_none() {
self.fuel = flow_defaults.fuel;
}
if self.memory_bytes.is_none() {
self.memory_bytes = flow_defaults.memory_bytes;
}
if self.timeout_ms.is_none() {
self.timeout_ms = flow_defaults.timeout_ms;
}
self
}
pub fn build(self) -> Result<Afterburner> {
let state_store = self.state_store.unwrap_or_else(InMemoryStateStore::shared);
let manifold = self.manifold.unwrap_or_else(Manifold::sealed);
let defaults = FuelGauge {
fuel: self.fuel,
memory_bytes: self.memory_bytes,
timeout_ms: self.timeout_ms,
manifold: manifold.clone(),
};
let mode = self.mode.unwrap_or_default();
#[cfg(feature = "flow")]
let flow = if self.use_flow {
let eng = afterburner_flow::FlowEngine::with_fuel(defaults.clone())?;
Some(eng)
} else {
None
};
let combustor: Box<dyn Combustor> =
build_combustor(mode, state_store.clone(), self.host_context.clone())?;
let mut cache = BurnCache::new(combustor);
if let Some(b) = self.cache_backend.clone() {
cache = cache.with_backend(b);
}
Ok(Afterburner {
engine: EngineHolder::Cache(cache),
defaults,
_state_store: state_store,
cwd: self.cwd,
#[cfg(feature = "flow")]
flow,
})
}
}
#[allow(unused_variables)]
fn build_combustor(
mode: Mode,
state_store: SharedStateStore,
host_context: Option<Arc<dyn HostContext>>,
) -> Result<Box<dyn Combustor>> {
match mode {
Mode::Native => build_native(state_store, host_context),
#[cfg(feature = "wasm")]
Mode::Wasm => build_wasm(state_store, host_context),
#[cfg(feature = "adaptive")]
Mode::Adaptive => build_adaptive(state_store, host_context),
}
}
#[cfg(feature = "native")]
fn build_native(
state_store: SharedStateStore,
host_context: Option<Arc<dyn HostContext>>,
) -> Result<Box<dyn Combustor>> {
let mut c = afterburner_ignite::NativeCombustor::with_state_store(state_store)?;
if let Some(ctx) = host_context {
c = c.with_host_context(ctx);
}
Ok(Box::new(c))
}
#[cfg(not(feature = "native"))]
fn build_native(
_state_store: SharedStateStore,
_host_context: Option<Arc<dyn HostContext>>,
) -> Result<Box<dyn Combustor>> {
Err(AfterburnerError::Engine(
"native mode requested but the `native` feature is not enabled".into(),
))
}
#[cfg(feature = "wasm")]
fn build_wasm(
state_store: SharedStateStore,
host_context: Option<Arc<dyn HostContext>>,
) -> Result<Box<dyn Combustor>> {
let cfg = afterburner_wasi::WasmConfig {
state_store: Some(state_store),
host_context,
transpile_hook: None,
};
Ok(Box::new(afterburner_wasi::WasmCombustor::new(cfg)?))
}
#[cfg(feature = "adaptive")]
fn build_adaptive(
state_store: SharedStateStore,
host_context: Option<Arc<dyn HostContext>>,
) -> Result<Box<dyn Combustor>> {
let cfg = afterburner_wasi::WasmConfig {
state_store: Some(state_store),
host_context,
transpile_hook: None,
};
Ok(Box::new(
afterburner_adaptive::AdaptiveCombustor::with_wasm_config(cfg)?,
))
}
#[cfg(feature = "thrust")]
pub struct ThreadedBuilder {
parent: AfterburnerBuilder,
workers: usize,
io_workers: usize,
tokens_per_sec: Option<u64>,
burst_tokens: u64,
local_queue_capacity: usize,
injector_capacity: usize,
shutdown_drain_deadline: Duration,
}
#[cfg(feature = "thrust")]
impl fmt::Debug for ThreadedBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThreadedBuilder")
.field("workers", &self.workers)
.field("io_workers", &self.io_workers)
.field("tokens_per_sec", &self.tokens_per_sec)
.field("burst_tokens", &self.burst_tokens)
.field("local_queue_capacity", &self.local_queue_capacity)
.field("injector_capacity", &self.injector_capacity)
.field("shutdown_drain_deadline", &self.shutdown_drain_deadline)
.finish_non_exhaustive()
}
}
#[cfg(feature = "thrust")]
impl ThreadedBuilder {
pub fn io_workers(mut self, n: usize) -> Self {
self.io_workers = n;
self
}
pub fn admission_tokens_per_sec(mut self, rate: u64) -> Self {
self.tokens_per_sec = Some(rate);
self
}
pub fn admission_burst(mut self, tokens: u64) -> Self {
self.burst_tokens = tokens;
self
}
pub fn local_queue_capacity(mut self, cap: usize) -> Self {
self.local_queue_capacity = cap;
self
}
pub fn injector_capacity(mut self, cap: usize) -> Self {
self.injector_capacity = cap;
self
}
pub fn shutdown_drain_deadline(mut self, d: Duration) -> Self {
self.shutdown_drain_deadline = d;
self
}
pub fn build(self) -> Result<Afterburner> {
let state_store = self
.parent
.state_store
.clone()
.unwrap_or_else(InMemoryStateStore::shared);
let manifold = self
.parent
.manifold
.clone()
.unwrap_or_else(Manifold::sealed);
let defaults = FuelGauge {
fuel: self.parent.fuel,
memory_bytes: self.parent.memory_bytes,
timeout_ms: self.parent.timeout_ms,
manifold: manifold.clone(),
};
let wasm_config = afterburner_wasi::WasmConfig {
state_store: Some(state_store.clone()),
host_context: self.parent.host_context.clone(),
transpile_hook: None,
};
let cfg = afterburner_thrust::ThrustEngineConfig {
compute_workers: self.workers,
io_workers: self.io_workers,
admission_tokens_per_sec: self.tokens_per_sec,
admission_burst_tokens: self.burst_tokens,
local_queue_capacity: self.local_queue_capacity,
injector_capacity: self.injector_capacity,
shutdown_drain_deadline: self.shutdown_drain_deadline,
wasm_config,
};
let engine = afterburner_thrust::ThrustEngine::new(cfg)?;
Ok(Afterburner {
engine: EngineHolder::Thrust(engine),
defaults,
_state_store: state_store,
cwd: self.parent.cwd,
#[cfg(feature = "flow")]
flow: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn new_runs_trivial_script() {
let ab = Afterburner::new().expect("new");
let id = ab
.register("module.exports = (d) => d.n + 1")
.expect("register");
let out = ab.run(&id, &json!({ "n": 41 })).expect("run");
assert_eq!(out, json!(42));
}
#[test]
fn register_is_idempotent() {
let ab = Afterburner::new().unwrap();
let id1 = ab.register("module.exports = (d) => d").unwrap();
let id2 = ab.register("module.exports = (d) => d").unwrap();
assert_eq!(id1.hash, id2.hash);
}
#[cfg(feature = "native")]
#[test]
fn native_mode_works() {
let ab = Afterburner::builder().mode(Mode::Native).build().unwrap();
let id = ab.register("module.exports = (d) => d * 3").unwrap();
let out = ab.run(&id, &json!(7)).unwrap();
assert_eq!(out, json!(21));
}
#[cfg(feature = "wasm")]
#[test]
fn wasm_mode_works() {
let ab = Afterburner::builder().mode(Mode::Wasm).build().unwrap();
let id = ab.register("module.exports = (d) => d * 2").unwrap();
let out = ab.run(&id, &json!(21)).unwrap();
assert_eq!(out, json!(42));
}
#[cfg(feature = "adaptive")]
#[test]
fn adaptive_mode_works() {
let ab = Afterburner::builder().mode(Mode::Adaptive).build().unwrap();
let id = ab.register("module.exports = (d) => d + 1").unwrap();
let out = ab.run(&id, &json!(99)).unwrap();
assert_eq!(out, json!(100));
}
#[test]
fn builder_applies_fuel_limit() {
let ab = Afterburner::builder()
.mode(Mode::Native)
.fuel(10_000)
.build()
.unwrap();
let id = ab
.register("module.exports = () => { while (true) {} }")
.unwrap();
let out = ab.run(&id, &json!(null));
assert!(matches!(out, Err(AfterburnerError::FuelExhausted)));
}
#[cfg(feature = "thrust")]
#[test]
fn threaded_mode_runs_trivially() {
let ab = Afterburner::builder()
.threaded(2)
.build()
.expect("threaded build");
let id = ab.register("module.exports = (d) => d.n + 1").unwrap();
let out = ab.run(&id, &json!({ "n": 5 })).unwrap();
assert_eq!(out, json!(6));
}
#[cfg(feature = "thrust")]
#[test]
fn threaded_run_batch_returns_array() {
let ab = Afterburner::builder().threaded(2).build().unwrap();
let id = ab.register("module.exports = (d) => d.x * 2").unwrap();
let input = json!([{ "x": 1 }, { "x": 2 }, { "x": 3 }]);
let out = ab.run_batch(&id, &input).unwrap();
assert_eq!(out, json!([2, 4, 6]));
}
#[cfg(feature = "flow")]
#[test]
fn flow_mode_exposes_register_bundle() {
let ab = Afterburner::builder().flow().build().unwrap();
let entry = "main";
let modules = vec![
(
"main".to_string(),
"import { two } from './helper'; module.exports = (d) => d + two;".to_string(),
),
("helper".to_string(), "export const two = 2;".to_string()),
];
let _ = ab.register_bundle(entry, &modules);
}
#[cfg(not(feature = "flow"))]
#[test]
fn register_bundle_requires_flow_mode_feature() {
}
#[test]
fn run_with_overrides_per_call_limits() {
let ab = Afterburner::builder()
.mode(Mode::Native)
.fuel(u64::MAX)
.build()
.unwrap();
let id = ab
.register("module.exports = () => { while (true) {} }")
.unwrap();
let strict = FuelGauge {
fuel: Some(10_000),
..FuelGauge::unlimited()
};
let out = ab.run_with(&id, &json!(null), &strict);
assert!(matches!(out, Err(AfterburnerError::FuelExhausted)));
}
}