use crate::AfterburnerError;
use crate::wasm::{
DaemonDgram, DaemonHttp, DaemonNet, DaemonRuntime, DaemonTls, DaemonWorkers, DgramEvent,
NetEvent, TlsEvent, WasmCombustor, WasmConfig, WorkerConfig, WorkerEvent,
};
use crate::{EnvAccess, ScriptInvocation};
use anyhow::{Context, Result};
use std::collections::BTreeMap;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use super::args::Cli;
use super::manifold::build_manifold;
pub fn execute(cli: &Cli, source: &str, script_label: &str, user_args: &[String]) -> Result<()> {
if let Some(mode) = cli.mode.as_deref()
&& mode.eq_ignore_ascii_case("native")
{
return super::script::execute(
&super::build::build_afterburner(cli)?,
source,
script_label,
user_args,
cli,
);
}
let invocation = build_invocation(cli, script_label, user_args);
let manifold = build_manifold(cli);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime")?;
let daemon_http = DaemonHttp::with_runtime(rt.handle().clone(), 1024);
let combustor = WasmCombustor::new(WasmConfig {
state_store: None,
host_context: None,
transpile_hook: ts_transpile_hook(),
})
.context("wasm combustor")?;
let mut daemon = DaemonRuntime::instantiate(
combustor.engine(),
combustor.instance_pre(),
manifold.clone(),
Some(combustor.state_store().clone()),
None,
Arc::clone(&daemon_http),
combustor.transpile_hook(),
)
.context("daemon instantiate")?;
let workers = DaemonWorkers::new_parent(manifold.clone(), WorkerConfig::default());
daemon.install_workers(Arc::clone(&workers));
let net = DaemonNet::new(rt.handle().clone(), manifold.clone());
daemon.install_net(Arc::clone(&net));
let tls = DaemonTls::new(rt.handle().clone(), manifold.clone());
daemon.install_tls(Arc::clone(&tls));
let dgram = DaemonDgram::new(rt.handle().clone(), manifold);
daemon.install_dgram(Arc::clone(&dgram));
let init_bytecode = match combustor.compile_daemon_init_bytecode(source, &invocation) {
Ok(bc) => bc,
Err(e) => {
let _ = std::io::stderr().write_all(format!("burn: {e}\n").as_bytes());
std::process::exit(1);
}
};
if let Err(e) = daemon.run_init_with_bytecode(&init_bytecode) {
flush_streams(&mut daemon)?;
match e {
AfterburnerError::ProcessExit(code) => std::process::exit(code),
other => {
let _ = std::io::stderr().write_all(format!("burn: {other}\n").as_bytes());
std::process::exit(1);
}
}
}
flush_streams(&mut daemon)?;
if !daemon.has_refs() {
rt.shutdown_timeout(Duration::from_secs(1));
return Ok(());
}
let shutdown = Arc::new(AtomicBool::new(false));
let inflight = Arc::new(AtomicUsize::new(0));
{
let shutdown = Arc::clone(&shutdown);
rt.spawn(async move {
let _ = tokio::signal::ctrl_c().await;
shutdown.store(true, Ordering::Release);
});
}
#[cfg(unix)]
{
let shutdown = Arc::clone(&shutdown);
rt.spawn(async move {
if let Ok(mut sigterm) =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
{
let _ = sigterm.recv().await;
shutdown.store(true, Ordering::Release);
}
});
}
run_event_loop(&mut daemon, &daemon_http, &shutdown, &inflight)?;
rt.shutdown_timeout(Duration::from_secs(2));
flush_streams(&mut daemon)?;
Ok(())
}
fn build_invocation(cli: &Cli, script_label: &str, user_args: &[String]) -> ScriptInvocation {
let mut argv = Vec::with_capacity(2 + user_args.len());
argv.push("burn".to_string());
argv.push(script_label.to_string());
argv.extend(user_args.iter().cloned());
ScriptInvocation {
argv,
env: collect_env(cli),
cwd: super::script::cli_cwd(),
}
}
fn collect_env(cli: &Cli) -> BTreeMap<String, String> {
let manifold = build_manifold(cli);
match &manifold.env {
EnvAccess::None => BTreeMap::new(),
EnvAccess::AllowList(keys) => keys
.iter()
.filter_map(|k| std::env::var(k).ok().map(|v| (k.clone(), v)))
.collect(),
EnvAccess::Full => std::env::vars().collect(),
}
}
fn run_event_loop(
daemon: &mut DaemonRuntime,
http: &Arc<DaemonHttp>,
shutdown: &Arc<AtomicBool>,
inflight: &Arc<AtomicUsize>,
) -> Result<()> {
while !shutdown.load(Ordering::Acquire) {
let mut did_work = false;
if let Some(event) = http.try_recv_event() {
did_work = true;
inflight.fetch_add(1, Ordering::Relaxed);
let envelope = event_to_envelope(&event);
let res = daemon.dispatch_event(envelope);
inflight.fetch_sub(1, Ordering::Relaxed);
flush_streams(daemon)?;
if let Err(e) = res {
if let AfterburnerError::ProcessExit(code) = &e {
std::process::exit(*code);
}
let _ =
std::io::stderr().write_all(format!("burn: dispatch error: {e}\n").as_bytes());
}
}
let fired = daemon.drain_expired_timers();
for timer_id in fired {
did_work = true;
let envelope = serde_json::json!({
"kind": "timer-fire",
"timer_id": timer_id,
});
let res = daemon.dispatch_event(envelope);
flush_streams(daemon)?;
if let Err(e) = res {
if let AfterburnerError::ProcessExit(code) = &e {
std::process::exit(*code);
}
let _ = std::io::stderr().write_all(format!("burn: timer error: {e}\n").as_bytes());
}
}
for _ in 0..256 {
let Some(evt) = daemon.try_recv_worker_event() else {
break;
};
did_work = true;
let (envelope, reap_id) = worker_event_to_envelope(&evt);
let res = daemon.dispatch_event(envelope);
flush_streams(daemon)?;
if let Err(e) = res {
if let AfterburnerError::ProcessExit(code) = &e {
std::process::exit(*code);
}
let _ = std::io::stderr()
.write_all(format!("burn: worker dispatch error: {e}\n").as_bytes());
}
if let Some(id) = reap_id {
daemon.reap_worker(id);
}
}
for _ in 0..256 {
let Some(evt) = daemon.try_recv_net_event() else {
break;
};
did_work = true;
let (envelope, reap_id) = net_event_to_envelope(&evt);
let res = daemon.dispatch_event(envelope);
flush_streams(daemon)?;
if let Err(e) = res {
if let AfterburnerError::ProcessExit(code) = &e {
std::process::exit(*code);
}
let _ = std::io::stderr()
.write_all(format!("burn: net dispatch error: {e}\n").as_bytes());
}
if let Some(id) = reap_id {
daemon.mark_net_closed(id);
}
}
for _ in 0..256 {
let Some(evt) = daemon.try_recv_tls_event() else {
break;
};
did_work = true;
let (envelope, reap_id) = tls_event_to_envelope(&evt);
let res = daemon.dispatch_event(envelope);
flush_streams(daemon)?;
if let Err(e) = res {
if let AfterburnerError::ProcessExit(code) = &e {
std::process::exit(*code);
}
let _ = std::io::stderr()
.write_all(format!("burn: tls dispatch error: {e}\n").as_bytes());
}
if let Some(id) = reap_id {
daemon.mark_tls_closed(id);
}
}
for _ in 0..256 {
let Some(evt) = daemon.try_recv_dgram_event() else {
break;
};
did_work = true;
let envelope = dgram_event_to_envelope(&evt);
let res = daemon.dispatch_event(envelope);
flush_streams(daemon)?;
if let Err(e) = res {
if let AfterburnerError::ProcessExit(code) = &e {
std::process::exit(*code);
}
let _ = std::io::stderr()
.write_all(format!("burn: dgram dispatch error: {e}\n").as_bytes());
}
}
if !daemon.has_refs() {
break;
}
if !did_work {
let max_sleep = Duration::from_millis(5);
let sleep_dur = daemon
.next_timer_deadline()
.map(|d| d.saturating_duration_since(Instant::now()).min(max_sleep))
.unwrap_or(max_sleep);
std::thread::sleep(sleep_dur);
}
}
Ok(())
}
fn event_to_envelope(event: &afterburner_wasi::daemon_http::DaemonEvent) -> serde_json::Value {
serde_json::json!({
"kind": "http-request",
"server_id": event.server_id,
"req_id": event.req_id,
"req": {
"method": event.method,
"url": event.url,
"headers": event.headers.iter().cloned().collect::<BTreeMap<_, _>>(),
"body": String::from_utf8_lossy(&event.body).into_owned(),
}
})
}
fn net_event_to_envelope(evt: &NetEvent) -> (serde_json::Value, Option<i32>) {
match evt {
NetEvent::Connect {
conn_id,
local,
remote,
} => (
serde_json::json!({
"kind": "net-connect",
"conn_id": conn_id,
"local": addr_json(local),
"remote": addr_json(remote),
}),
None,
),
NetEvent::Connection {
server_id,
conn_id,
local,
remote,
} => (
serde_json::json!({
"kind": "net-connection",
"server_id": server_id,
"conn_id": conn_id,
"local": addr_json(local),
"remote": addr_json(remote),
}),
None,
),
NetEvent::Data {
conn_id,
payload_b64,
} => (
serde_json::json!({
"kind": "net-data",
"conn_id": conn_id,
"payload_b64": payload_b64,
}),
None,
),
NetEvent::End { conn_id } => (
serde_json::json!({"kind": "net-end", "conn_id": conn_id}),
None,
),
NetEvent::Drain { conn_id } => (
serde_json::json!({"kind": "net-drain", "conn_id": conn_id}),
None,
),
NetEvent::Close { conn_id, had_error } => (
serde_json::json!({
"kind": "net-close",
"conn_id": conn_id,
"had_error": had_error,
}),
Some(*conn_id),
),
NetEvent::Error {
conn_id,
message,
code,
} => (
serde_json::json!({
"kind": "net-error",
"conn_id": conn_id,
"message": message,
"code": code,
}),
None,
),
NetEvent::Listening { server_id, port } => (
serde_json::json!({
"kind": "net-listening",
"server_id": server_id,
"port": port,
}),
None,
),
NetEvent::ServerError { server_id, message } => (
serde_json::json!({
"kind": "net-server-error",
"server_id": server_id,
"message": message,
}),
None,
),
}
}
fn tls_event_to_envelope(evt: &TlsEvent) -> (serde_json::Value, Option<i32>) {
match evt {
TlsEvent::Connect {
conn_id,
local,
remote,
alpn_protocol,
protocol,
authorized,
cipher,
peer_cert_chain_der,
} => (
serde_json::json!({
"kind": "tls-connect",
"conn_id": conn_id,
"local": addr_json(local),
"remote": addr_json(remote),
"alpn_protocol": alpn_protocol,
"protocol": protocol,
"authorized": authorized,
"cipher": cipher,
"peer_cert_chain_der_b64": cert_chain_to_b64(peer_cert_chain_der),
}),
None,
),
TlsEvent::Connection {
server_id,
conn_id,
local,
remote,
alpn_protocol,
protocol,
cipher,
peer_cert_chain_der,
} => (
serde_json::json!({
"kind": "tls-connection",
"server_id": server_id,
"conn_id": conn_id,
"local": addr_json(local),
"remote": addr_json(remote),
"alpn_protocol": alpn_protocol,
"protocol": protocol,
"cipher": cipher,
"peer_cert_chain_der_b64": cert_chain_to_b64(peer_cert_chain_der),
}),
None,
),
TlsEvent::Data {
conn_id,
payload_b64,
} => (
serde_json::json!({
"kind": "tls-data",
"conn_id": conn_id,
"payload_b64": payload_b64,
}),
None,
),
TlsEvent::End { conn_id } => (
serde_json::json!({"kind": "tls-end", "conn_id": conn_id}),
None,
),
TlsEvent::Drain { conn_id } => (
serde_json::json!({"kind": "tls-drain", "conn_id": conn_id}),
None,
),
TlsEvent::Close { conn_id, had_error } => (
serde_json::json!({
"kind": "tls-close",
"conn_id": conn_id,
"had_error": had_error,
}),
Some(*conn_id),
),
TlsEvent::Error {
conn_id,
message,
code,
} => (
serde_json::json!({
"kind": "tls-error",
"conn_id": conn_id,
"message": message,
"code": code,
}),
None,
),
TlsEvent::Listening { server_id, port } => (
serde_json::json!({
"kind": "tls-listening",
"server_id": server_id,
"port": port,
}),
None,
),
TlsEvent::ServerError { server_id, message } => (
serde_json::json!({
"kind": "tls-server-error",
"server_id": server_id,
"message": message,
}),
None,
),
}
}
fn addr_json(addr: &Option<std::net::SocketAddr>) -> serde_json::Value {
match addr {
Some(a) => {
let family = if a.is_ipv4() { "IPv4" } else { "IPv6" };
serde_json::json!({
"address": a.ip().to_string(),
"family": family,
"port": a.port(),
})
}
None => serde_json::Value::Null,
}
}
fn cert_chain_to_b64(chain: &[Vec<u8>]) -> serde_json::Value {
use base64::Engine as _;
let arr: Vec<serde_json::Value> = chain
.iter()
.map(|der| serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(der)))
.collect();
serde_json::Value::Array(arr)
}
fn worker_event_to_envelope(evt: &WorkerEvent) -> (serde_json::Value, Option<i32>) {
match evt {
WorkerEvent::Online { worker_id } => (
serde_json::json!({"kind": "worker-online", "worker_id": worker_id}),
None,
),
WorkerEvent::Message { worker_id, payload } => (
serde_json::json!({
"kind": "worker-message",
"worker_id": worker_id,
"payload": payload,
}),
None,
),
WorkerEvent::Error {
worker_id,
message,
stack,
} => (
serde_json::json!({
"kind": "worker-error",
"worker_id": worker_id,
"message": message,
"stack": stack,
}),
None,
),
WorkerEvent::Exit { worker_id, code } => (
serde_json::json!({
"kind": "worker-exit",
"worker_id": worker_id,
"code": code,
}),
Some(*worker_id),
),
WorkerEvent::ParentMessage { payload } => (
serde_json::json!({
"kind": "worker-parent-message",
"payload": payload,
}),
None,
),
WorkerEvent::TerminateRequested => (
serde_json::json!({"kind": "worker-terminate-requested"}),
None,
),
}
}
fn flush_streams(daemon: &mut DaemonRuntime) -> Result<()> {
let stdout = daemon.drain_stdout();
let stderr = daemon.drain_stderr();
thread_local! {
static HW_STDOUT: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
static HW_STDERR: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
}
let new_stdout_at = HW_STDOUT.with(|c| c.get());
let new_stderr_at = HW_STDERR.with(|c| c.get());
if stdout.len() > new_stdout_at {
std::io::stdout()
.write_all(&stdout[new_stdout_at..])
.context("write stdout")?;
HW_STDOUT.with(|c| c.set(stdout.len()));
}
if stderr.len() > new_stderr_at {
std::io::stderr()
.write_all(&stderr[new_stderr_at..])
.context("write stderr")?;
HW_STDERR.with(|c| c.set(stderr.len()));
}
Ok(())
}
pub fn script_label(path: &Path) -> String {
path.canonicalize()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_else(|_| path.to_string_lossy().into_owned())
}
#[cfg(feature = "ts")]
pub(super) fn ts_transpile_hook() -> Option<afterburner_wasi::host::TranspileFn> {
Some(Arc::new(
|source: &str, path: &str| -> Result<String, String> {
let p = std::path::PathBuf::from(path);
if crate::ts::is_typescript(&p) {
crate::ts::transpile(source, &p).map_err(|e| e.to_string())
} else {
crate::ts::lower_esm_js(source, &p).map_err(|e| e.to_string())
}
},
))
}
#[cfg(not(feature = "ts"))]
pub(super) fn ts_transpile_hook() -> Option<afterburner_wasi::host::TranspileFn> {
None
}
fn dgram_event_to_envelope(evt: &DgramEvent) -> serde_json::Value {
match evt {
DgramEvent::Listening { socket_id, port } => serde_json::json!({
"kind": "dgram-listening",
"socketId": socket_id,
"port": port,
}),
DgramEvent::Message {
socket_id,
from,
payload_b64,
} => {
let family = if from.is_ipv4() { "IPv4" } else { "IPv6" };
serde_json::json!({
"kind": "dgram-message",
"socketId": socket_id,
"payload": payload_b64,
"from": {
"address": from.ip().to_string(),
"port": from.port(),
"family": family,
},
})
}
DgramEvent::Error {
socket_id,
message,
code,
} => serde_json::json!({
"kind": "dgram-error",
"socketId": socket_id,
"message": message,
"code": code,
}),
DgramEvent::Close { socket_id } => serde_json::json!({
"kind": "dgram-close",
"socketId": socket_id,
}),
}
}