use crate::ab_event;
use crate::engine::Combustor;
use crate::error::{AfterburnerError, Result};
use crate::log::Level;
use crate::types::{FuelGauge, ScriptId, sha256};
use kovan_map::HopscotchMap;
use serde_json::Value;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
pub trait BurnCacheBackend: Send + Sync {
fn fetch(&self, hash: &[u8; 32]) -> Result<Option<String>>;
fn publish(&self, hash: &[u8; 32], source: &str) -> Result<()>;
}
#[derive(Default)]
pub struct InProcessCacheBackend {
store: HopscotchMap<[u8; 32], String>,
}
impl InProcessCacheBackend {
pub fn new() -> Self {
Self::default()
}
pub fn shared() -> Arc<Self> {
Arc::new(Self::new())
}
}
impl BurnCacheBackend for InProcessCacheBackend {
fn fetch(&self, hash: &[u8; 32]) -> Result<Option<String>> {
Ok(self.store.get(hash))
}
fn publish(&self, hash: &[u8; 32], source: &str) -> Result<()> {
self.store.insert(*hash, source.to_string());
Ok(())
}
}
#[derive(Default)]
pub struct RegistryStats {
pub cache_hits: AtomicU64,
pub cache_misses: AtomicU64,
}
impl RegistryStats {
pub fn hits(&self) -> u64 {
self.cache_hits.load(Ordering::Relaxed)
}
pub fn misses(&self) -> u64 {
self.cache_misses.load(Ordering::Relaxed)
}
}
struct CompileCell {
result: OnceLock<std::result::Result<ScriptId, String>>,
}
impl CompileCell {
fn new() -> Arc<Self> {
Arc::new(Self {
result: OnceLock::new(),
})
}
}
pub struct BurnCache {
engine: Box<dyn Combustor>,
compiled: HopscotchMap<[u8; 32], Arc<CompileCell>>,
source_store: HopscotchMap<[u8; 32], String>,
backend: Option<Arc<dyn BurnCacheBackend>>,
stats: RegistryStats,
}
impl BurnCache {
pub fn new(engine: Box<dyn Combustor>) -> Self {
Self {
engine,
compiled: HopscotchMap::new(),
source_store: HopscotchMap::new(),
backend: None,
stats: RegistryStats::default(),
}
}
pub fn with_backend(mut self, backend: Arc<dyn BurnCacheBackend>) -> Self {
self.backend = Some(backend);
self
}
pub fn register_by_hash(&self, hash: &[u8; 32]) -> Result<ScriptId> {
if let Some(src) = self.source_store.get(hash) {
return self.register(&src);
}
let backend = self
.backend
.as_ref()
.ok_or(AfterburnerError::ScriptNotFound)?;
match backend.fetch(hash)? {
Some(src) => self.register(&src),
None => Err(AfterburnerError::ScriptNotFound),
}
}
#[fastrace::trace(name = "BurnCache::register")]
pub fn register(&self, source: &str) -> Result<ScriptId> {
let hash = sha256(source.as_bytes());
if let Some(cell) = self.compiled.get(&hash)
&& let Some(outcome) = cell.result.get()
{
self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
ab_event!(Level::Debug, "burn_cache.hit", "hash" => hex32(&hash));
return outcome_to_result(outcome);
}
let fresh = CompileCell::new();
self.compiled.insert_if_absent(hash, fresh.clone());
let cell = self.compiled.get(&hash).unwrap_or_else(|| fresh.clone());
let is_winner = Arc::ptr_eq(&cell, &fresh);
if is_winner {
self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
ab_event!(
Level::Info,
"burn_cache.miss",
"hash" => hex32(&hash),
"source_bytes" => source.len(),
);
self.source_store.insert(hash, source.to_string());
if let Some(b) = self.backend.as_ref()
&& let Err(e) = b.publish(&hash, source)
{
ab_event!(Level::Warn, "burn_cache.publish_failed", "error" => e.to_string());
}
let stored = match self.engine.ignite(source) {
Ok(id) => Ok(id),
Err(e) => {
ab_event!(Level::Warn, "burn_cache.compile_failed", "error" => e);
Err(e.to_string())
}
};
let _ = cell.result.set(stored.clone());
return match stored {
Ok(id) => Ok(id),
Err(msg) => Err(AfterburnerError::CompileFailed(msg)),
};
}
self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
ab_event!(
Level::Debug,
"burn_cache.wait_on_peer",
"hash" => hex32(&hash),
);
loop {
if let Some(outcome) = cell.result.get() {
return outcome_to_result(outcome);
}
thread::yield_now();
}
}
#[fastrace::trace(name = "BurnCache::execute")]
pub fn execute(&self, id: &ScriptId, input: &Value, limits: &FuelGauge) -> Result<Value> {
self.engine.thrust(id, input, limits)
}
#[fastrace::trace(name = "BurnCache::execute_raw")]
pub fn execute_raw(&self, id: &ScriptId, input: &[u8], limits: &FuelGauge) -> Result<Value> {
self.engine.thrust_raw(id, input, limits)
}
#[fastrace::trace(name = "BurnCache::execute_out")]
pub fn execute_out(
&self,
id: &ScriptId,
input: &Value,
limits: &FuelGauge,
) -> Result<crate::OutputValue> {
self.engine.thrust_out(id, input, limits)
}
#[fastrace::trace(name = "BurnCache::execute_raw_out")]
pub fn execute_raw_out(
&self,
id: &ScriptId,
input: &[u8],
limits: &FuelGauge,
) -> Result<crate::OutputValue> {
self.engine.thrust_raw_out(id, input, limits)
}
#[fastrace::trace(name = "BurnCache::run_script")]
pub fn run_script(
&self,
source: &str,
invocation: &crate::ScriptInvocation,
limits: &FuelGauge,
) -> Result<crate::ScriptOutcome> {
self.engine.run_script(source, invocation, limits)
}
#[fastrace::trace(name = "BurnCache::execute_batch")]
pub fn execute_batch(&self, id: &ScriptId, rows: &Value, limits: &FuelGauge) -> Result<Value> {
if !rows.is_array() {
return Err(AfterburnerError::Host(
"execute_batch: input must be a JSON array".into(),
));
}
let out = self.engine.thrust(id, rows, limits)?;
if !out.is_array() {
return Err(AfterburnerError::Host(format!(
"execute_batch: script must return an array; got {}",
type_name(&out)
)));
}
Ok(out)
}
#[fastrace::trace(name = "BurnCache::execute_columnar_bytes")]
pub fn execute_columnar_bytes(
&self,
id: &ScriptId,
encoded: &[u8],
limits: &FuelGauge,
) -> Result<Vec<u8>> {
self.engine.thrust_columnar_bytes(id, encoded, limits)
}
pub fn forget(&self, id: &ScriptId) {
self.compiled.remove(&id.hash);
self.source_store.remove(&id.hash);
self.engine.extinguish(id);
ab_event!(Level::Info, "burn_cache.forget", "hash" => hex32(&id.hash));
}
pub fn source(&self, id: &ScriptId) -> Option<String> {
self.source_store.get(&id.hash)
}
pub fn stats(&self) -> &RegistryStats {
&self.stats
}
}
fn outcome_to_result(o: &std::result::Result<ScriptId, String>) -> Result<ScriptId> {
match o {
Ok(id) => Ok(*id),
Err(msg) => Err(AfterburnerError::CompileFailed(msg.clone())),
}
}
pub fn hex32(hash: &[u8; 32]) -> String {
let mut s = String::with_capacity(16);
for b in &hash[..8] {
s.push_str(&format!("{b:02x}"));
}
s
}
fn type_name(v: &Value) -> &'static str {
match v {
Value::Null => "null",
Value::Bool(_) => "boolean",
Value::Number(_) => "number",
Value::String(_) => "string",
Value::Array(_) => "array",
Value::Object(_) => "object",
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::Combustor;
use crate::types::EngineMode;
use serde_json::json;
#[derive(Default)]
struct MockCombustor {
ignite_count: AtomicU64,
thrust_count: AtomicU64,
last_thrust: HopscotchMap<u8, Value>,
}
impl Combustor for MockCombustor {
fn ignite(&self, source: &str) -> Result<ScriptId> {
self.ignite_count.fetch_add(1, Ordering::Relaxed);
Ok(ScriptId {
hash: sha256(source.as_bytes()),
mode: EngineMode::Native,
})
}
fn thrust(&self, _id: &ScriptId, input: &Value, _lim: &FuelGauge) -> Result<Value> {
self.thrust_count.fetch_add(1, Ordering::Relaxed);
self.last_thrust.insert(0u8, input.clone());
Ok(json!({"echo": input}))
}
fn extinguish(&self, _id: &ScriptId) {}
}
fn cache_with_mock() -> (BurnCache, std::sync::Arc<MockCombustor>) {
let mock = std::sync::Arc::new(MockCombustor::default());
struct Shim(std::sync::Arc<MockCombustor>);
impl Combustor for Shim {
fn ignite(&self, s: &str) -> Result<ScriptId> {
self.0.ignite(s)
}
fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
self.0.thrust(id, i, l)
}
fn extinguish(&self, id: &ScriptId) {
self.0.extinguish(id)
}
}
(BurnCache::new(Box::new(Shim(mock.clone()))), mock)
}
#[test]
fn register_is_idempotent() {
let (cache, mock) = cache_with_mock();
let id1 = cache.register("module.exports = () => 1").unwrap();
let id2 = cache.register("module.exports = () => 1").unwrap();
assert_eq!(id1.hash, id2.hash);
assert_eq!(mock.ignite_count.load(Ordering::Relaxed), 1);
assert_eq!(cache.stats().hits(), 1);
assert_eq!(cache.stats().misses(), 1);
}
#[test]
fn different_sources_compile_separately() {
let (cache, mock) = cache_with_mock();
cache.register("module.exports = () => 1").unwrap();
cache.register("module.exports = () => 2").unwrap();
assert_eq!(mock.ignite_count.load(Ordering::Relaxed), 2);
assert_eq!(cache.stats().misses(), 2);
}
#[test]
fn execute_delegates_to_engine() {
let (cache, mock) = cache_with_mock();
let id = cache.register("module.exports = () => 1").unwrap();
let out = cache
.execute(&id, &json!({"x": 7}), &FuelGauge::unlimited())
.unwrap();
assert_eq!(out, json!({"echo": {"x": 7}}));
assert_eq!(mock.thrust_count.load(Ordering::Relaxed), 1);
}
#[test]
fn forget_removes_from_cache() {
let (cache, _mock) = cache_with_mock();
let id = cache.register("module.exports = () => 1").unwrap();
assert!(cache.source(&id).is_some());
cache.forget(&id);
assert!(cache.source(&id).is_none());
}
fn shared_backend_pair() -> (
BurnCache,
BurnCache,
std::sync::Arc<MockCombustor>,
std::sync::Arc<MockCombustor>,
std::sync::Arc<InProcessCacheBackend>,
) {
let mock_a = std::sync::Arc::new(MockCombustor::default());
let mock_b = std::sync::Arc::new(MockCombustor::default());
struct Shim(std::sync::Arc<MockCombustor>);
impl Combustor for Shim {
fn ignite(&self, s: &str) -> Result<ScriptId> {
self.0.ignite(s)
}
fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
self.0.thrust(id, i, l)
}
fn extinguish(&self, id: &ScriptId) {
self.0.extinguish(id)
}
}
let backend = InProcessCacheBackend::shared();
let cache_a = BurnCache::new(Box::new(Shim(mock_a.clone())))
.with_backend(backend.clone() as std::sync::Arc<dyn BurnCacheBackend>);
let cache_b = BurnCache::new(Box::new(Shim(mock_b.clone())))
.with_backend(backend.clone() as std::sync::Arc<dyn BurnCacheBackend>);
(cache_a, cache_b, mock_a, mock_b, backend)
}
#[test]
fn register_publishes_to_backend() {
let (cache_a, _cache_b, _mock_a, _mock_b, backend) = shared_backend_pair();
let id = cache_a.register("module.exports = () => 99").unwrap();
let fetched = backend.fetch(&id.hash).unwrap();
assert_eq!(fetched.as_deref(), Some("module.exports = () => 99"));
}
#[test]
fn register_by_hash_resolves_via_shared_backend() {
let (cache_a, cache_b, _mock_a, mock_b, _backend) = shared_backend_pair();
let id_a = cache_a.register("module.exports = (d) => d + 1").unwrap();
let id_b = cache_b.register_by_hash(&id_a.hash).unwrap();
assert_eq!(id_a.hash, id_b.hash);
assert_eq!(mock_b.ignite_count.load(Ordering::Relaxed), 1);
}
#[test]
fn register_by_hash_without_backend_is_not_found() {
let (cache, _mock) = cache_with_mock();
let err = cache.register_by_hash(&[0xab; 32]).unwrap_err();
assert!(
matches!(err, AfterburnerError::ScriptNotFound),
"got: {err:?}"
);
}
#[test]
fn register_by_hash_prefers_local_source_over_backend() {
struct LoudBackend;
impl BurnCacheBackend for LoudBackend {
fn fetch(&self, _: &[u8; 32]) -> Result<Option<String>> {
panic!("backend.fetch should not be called on a local hit");
}
fn publish(&self, _: &[u8; 32], _: &str) -> Result<()> {
Ok(())
}
}
let mock = std::sync::Arc::new(MockCombustor::default());
struct Shim(std::sync::Arc<MockCombustor>);
impl Combustor for Shim {
fn ignite(&self, s: &str) -> Result<ScriptId> {
self.0.ignite(s)
}
fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
self.0.thrust(id, i, l)
}
fn extinguish(&self, id: &ScriptId) {
self.0.extinguish(id)
}
}
let cache = BurnCache::new(Box::new(Shim(mock.clone())))
.with_backend(std::sync::Arc::new(LoudBackend));
let id = cache.register("module.exports = () => 7").unwrap();
let id2 = cache.register_by_hash(&id.hash).unwrap();
assert_eq!(id.hash, id2.hash);
}
#[test]
fn execute_batch_rejects_non_array_input() {
let (cache, _) = cache_with_mock();
let id = cache.register("module.exports = (r) => r").unwrap();
let err = cache
.execute_batch(&id, &json!({"x": 1}), &FuelGauge::unlimited())
.unwrap_err();
match err {
crate::AfterburnerError::Host(m) => {
assert!(m.contains("must be a JSON array"), "got: {m}");
}
other => panic!("expected Host error; got {other:?}"),
}
}
#[test]
fn execute_batch_rejects_non_array_output() {
let (cache, _) = cache_with_mock();
let id = cache.register("module.exports = (r) => r").unwrap();
let err = cache
.execute_batch(&id, &json!([{"n": 1}]), &FuelGauge::unlimited())
.unwrap_err();
match err {
crate::AfterburnerError::Host(m) => {
assert!(m.contains("must return an array"), "got: {m}");
}
other => panic!("expected Host error; got {other:?}"),
}
}
#[test]
fn concurrent_register_compiles_exactly_once_per_source() {
use std::thread;
let (cache, mock) = cache_with_mock();
let cache = std::sync::Arc::new(cache);
let mut handles = Vec::new();
for _ in 0..16 {
let c = cache.clone();
handles.push(thread::spawn(move || {
c.register("module.exports = () => 42").unwrap()
}));
}
let ids: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
assert!(ids.windows(2).all(|w| w[0].hash == w[1].hash));
assert_eq!(
mock.ignite_count.load(Ordering::Relaxed),
1,
"OnceLock dedup must collapse N concurrent registers into 1 ignite"
);
}
}