use crate::AfterburnerError;
use crate::wasm::{DaemonHttp, DaemonShardPool, ShardPoolConfig, WasmCombustor, WasmConfig};
use crate::{EnvAccess, ScriptInvocation};
use afterburner_wasi::daemon_workers::WorkerConfig;
use anyhow::{Context, Result};
use std::collections::BTreeMap;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::available_parallelism;
use std::time::Duration;
use super::args::Cli;
use super::manifold::build_manifold;
const MAX_SHARDS: usize = 128;
fn daemon_shard_count() -> usize {
let auto = available_parallelism().map(|n| n.get()).unwrap_or(1);
let env = match std::env::var("BURN_SHARDS") {
Ok(s) => s,
Err(_) => return auto,
};
match env.trim().parse::<usize>() {
Ok(0) => {
let _ = writeln!(
std::io::stderr(),
"burn: BURN_SHARDS=0 invalid (must be ≥ 1); using auto-detected {auto}"
);
auto
}
Ok(n) if n > MAX_SHARDS => {
let _ = writeln!(
std::io::stderr(),
"burn: BURN_SHARDS={n} exceeds cap; clamping to {MAX_SHARDS}"
);
MAX_SHARDS
}
Ok(n) => {
if n > auto {
let _ = writeln!(
std::io::stderr(),
"burn: BURN_SHARDS={n} > available_parallelism()={auto}; \
oversubscribing dedicated-thread shards incurs context-switch \
tax with no throughput benefit"
);
}
n
}
Err(_) => {
let _ = writeln!(
std::io::stderr(),
"burn: BURN_SHARDS={env:?} not a positive integer; using auto-detected {auto}"
);
auto
}
}
}
pub fn execute(cli: &Cli, source: &str, script_label: &str, user_args: &[String]) -> Result<()> {
if let Some(mode) = cli.mode.as_deref()
&& mode.eq_ignore_ascii_case("native")
{
return super::script::execute(
&super::build::build_afterburner(cli)?,
source,
script_label,
user_args,
cli,
);
}
let invocation = build_invocation(cli, script_label, user_args);
let manifold = build_manifold(cli);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime")?;
let daemon_http = DaemonHttp::with_runtime(rt.handle().clone(), 1024);
let combustor = WasmCombustor::new(WasmConfig {
state_store: None,
host_context: None,
transpile_hook: ts_transpile_hook(),
compile_cache_dir: None,
})
.context("wasm combustor")?;
let init_bytecode = match combustor.compile_daemon_init_bytecode(source, &invocation) {
Ok(bc) => Arc::new(bc),
Err(e) => {
let _ = std::io::stderr().write_all(format!("burn: {e}\n").as_bytes());
std::process::exit(1);
}
};
let shard_count = daemon_shard_count();
let shutdown = Arc::new(AtomicBool::new(false));
let pool = match DaemonShardPool::spawn(ShardPoolConfig {
shard_count,
expand_only_for_http_listener: true,
engine: combustor.engine().clone(),
instance_pre: Arc::clone(combustor.instance_pre()),
init_bytecode: Arc::clone(&init_bytecode),
manifold,
state_store: Some(combustor.state_store().clone()),
host_context: None,
daemon_http: Arc::clone(&daemon_http),
transpile_hook: combustor.transpile_hook(),
worker_config: WorkerConfig::default(),
tokio_handle: rt.handle().clone(),
invocation,
shutdown: Arc::clone(&shutdown),
queue_depth_per_shard: None,
}) {
Ok(p) => p,
Err(e) => {
match e {
AfterburnerError::ProcessExit(code) => std::process::exit(code),
other => {
let _ = std::io::stderr().write_all(format!("burn: {other}\n").as_bytes());
std::process::exit(1);
}
}
}
};
if !std::env::var("BURN_QUIET").is_ok_and(|v| v == "1") {
let actual = pool.shard_count();
let source = if std::env::var("BURN_SHARDS").is_ok() {
"BURN_SHARDS env var"
} else {
"auto-detected from available CPUs"
};
let _ = writeln!(
std::io::stderr(),
"burn: daemon running {actual} shard(s) (requested {shard_count} via {source})\n\
burn: in-process JS state is per-shard; use require('afterburner:state') for shared state",
);
}
if let Some(first) = pool.init_results().first() {
let mut so = std::io::stdout().lock();
let _ = so.write_all(&first.stdout);
let _ = so.flush();
drop(so);
let mut se = std::io::stderr().lock();
let _ = se.write_all(&first.stderr);
let _ = se.flush();
drop(se);
}
if !pool.any_has_refs() {
drop(pool);
rt.shutdown_timeout(Duration::from_secs(1));
return Ok(());
}
{
let shutdown = Arc::clone(&shutdown);
rt.spawn(async move {
let _ = tokio::signal::ctrl_c().await;
shutdown.store(true, Ordering::Release);
});
}
#[cfg(unix)]
{
let shutdown = Arc::clone(&shutdown);
rt.spawn(async move {
if let Ok(mut sigterm) =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
{
let _ = sigterm.recv().await;
shutdown.store(true, Ordering::Release);
}
});
}
while !shutdown.load(Ordering::Acquire) {
if !pool.any_has_refs() {
break;
}
std::thread::sleep(Duration::from_millis(50));
}
shutdown.store(true, Ordering::Release);
drop(pool);
rt.shutdown_timeout(Duration::from_secs(2));
Ok(())
}
fn build_invocation(cli: &Cli, script_label: &str, user_args: &[String]) -> ScriptInvocation {
let mut argv = Vec::with_capacity(2 + user_args.len());
argv.push("burn".to_string());
argv.push(script_label.to_string());
argv.extend(user_args.iter().cloned());
ScriptInvocation {
argv,
env: collect_env(cli),
cwd: super::script::cli_cwd(),
}
}
fn collect_env(cli: &Cli) -> BTreeMap<String, String> {
let manifold = build_manifold(cli);
let mut env: BTreeMap<String, String> = match &manifold.env {
EnvAccess::None => BTreeMap::new(),
EnvAccess::AllowList(keys) => keys
.iter()
.filter_map(|k| std::env::var(k).ok().map(|v| (k.clone(), v)))
.collect(),
EnvAccess::Full => std::env::vars().collect(),
};
for path in &cli.env_file {
if let Ok(text) = std::fs::read_to_string(path) {
for line in text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
let Some(eq) = line.find('=') else { continue };
let key = line[..eq].trim();
if key.is_empty() {
continue;
}
let mut val = line[eq + 1..].trim();
if val.len() >= 2
&& ((val.starts_with('"') && val.ends_with('"'))
|| (val.starts_with('\'') && val.ends_with('\'')))
{
val = &val[1..val.len() - 1];
}
env.insert(key.to_string(), val.to_string());
}
}
}
env
}
pub fn script_label(path: &Path) -> String {
path.canonicalize()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_else(|_| path.to_string_lossy().into_owned())
}
#[cfg(feature = "ts")]
pub(super) fn ts_transpile_hook() -> Option<afterburner_wasi::host::TranspileFn> {
Some(Arc::new(
|source: &str, path: &str| -> Result<String, String> {
let p = std::path::PathBuf::from(path);
if crate::ts::is_typescript(&p) {
crate::ts::transpile(source, &p).map_err(|e| e.to_string())
} else {
crate::ts::lower_esm_js(source, &p).map_err(|e| e.to_string())
}
},
))
}
#[cfg(not(feature = "ts"))]
pub(super) fn ts_transpile_hook() -> Option<afterburner_wasi::host::TranspileFn> {
None
}