Skip to main content

afterburner_core/
registry.rs

1// SPDX-License-Identifier: BUSL-1.1
2// Copyright (c) 2026 vertexclique
3// Licensed under the Business Source License 1.1.
4// Change Date: 4 years after this version's release. Change License: Apache-2.0.
5
6//! `BurnCache` - content-addressed script cache sitting in front of any
7//! `Combustor`. Compiles each unique source exactly once; `execute`
8//! delegates to the engine's `thrust` with per-call limits.
9
10use crate::ab_event;
11use crate::engine::Combustor;
12use crate::error::{AfterburnerError, Result};
13use crate::log::Level;
14use crate::types::{FuelGauge, ScriptId, sha256};
15use kovan_map::HopscotchMap;
16use serde_json::Value;
17use std::sync::Arc;
18use std::sync::OnceLock;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::thread;
21
22/// Pluggable storage for script source text, keyed by SHA-256 of the
23/// source. Enables distributed deployments where a single script is
24/// registered once on any node and replicated via an external
25/// coordinator (Redis, S3, NATS, etc.). Each node still compiles
26/// locally - the backend stores source text, not compiled modules,
27/// since the compiled form is engine-specific (wasmtime vs rquickjs)
28/// and not portably serializable today.
29///
30/// Trait objects must be `Send + Sync` because `BurnCache` is shared
31/// across threads.
32pub trait BurnCacheBackend: Send + Sync {
33    /// Look up the source for `hash`. `Ok(None)` means "not in this
34    /// backend" - BurnCache then expects the caller to supply the
35    /// source via `register(source)`. `Err(_)` is treated the same as
36    /// `Ok(None)` in the hot path - backend errors must never block
37    /// registration of a locally-available source.
38    fn fetch(&self, hash: &[u8; 32]) -> Result<Option<String>>;
39
40    /// Store `source` under `hash`. Called after a successful local
41    /// compile so peer nodes can look it up on their own registration.
42    /// Must be idempotent - a concurrent publisher racing this call
43    /// with the same hash is explicitly allowed.
44    fn publish(&self, hash: &[u8; 32], source: &str) -> Result<()>;
45}
46
47/// In-process default backend - no network involvement, state lives in
48/// a single lock-free map. Equivalent to the pre-Phase-G behavior.
49#[derive(Default)]
50pub struct InProcessCacheBackend {
51    store: HopscotchMap<[u8; 32], String>,
52}
53
54impl InProcessCacheBackend {
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    pub fn shared() -> Arc<Self> {
60        Arc::new(Self::new())
61    }
62}
63
64impl BurnCacheBackend for InProcessCacheBackend {
65    fn fetch(&self, hash: &[u8; 32]) -> Result<Option<String>> {
66        Ok(self.store.get(hash))
67    }
68
69    fn publish(&self, hash: &[u8; 32], source: &str) -> Result<()> {
70        self.store.insert(*hash, source.to_string());
71        Ok(())
72    }
73}
74
75/// Statistics the cache exposes for observability. Load-atomically; no
76/// snapshot guarantees across fields.
77#[derive(Default)]
78pub struct RegistryStats {
79    pub cache_hits: AtomicU64,
80    pub cache_misses: AtomicU64,
81}
82
83impl RegistryStats {
84    pub fn hits(&self) -> u64 {
85        self.cache_hits.load(Ordering::Relaxed)
86    }
87    pub fn misses(&self) -> u64 {
88        self.cache_misses.load(Ordering::Relaxed)
89    }
90}
91
92/// One-shot, wait-free cell for a single compile attempt. First writer
93/// fills the `OnceLock`; later waiters spin briefly on `Option::is_some()`
94/// (wait-free read) until the writer publishes.
95///
96/// Stored as `Arc<CompileCell>` inside `compiled` so every concurrent
97/// caller for a given hash shares the same cell.
98struct CompileCell {
99    /// Outcome of the compile. `Ok(id)` on success, `Err(msg)` on failure.
100    /// We keep the error as `String` because `AfterburnerError` is not
101    /// `Clone`; waiters that lose the race reconstruct a
102    /// `CompileFailed(msg)` with the same text.
103    result: OnceLock<std::result::Result<ScriptId, String>>,
104}
105
106impl CompileCell {
107    fn new() -> Arc<Self> {
108        Arc::new(Self {
109            result: OnceLock::new(),
110        })
111    }
112}
113
114/// Thread-safe compile-or-cache wrapper over a `Combustor`. Identical
115/// sources produce **exactly one** compile across concurrent callers;
116/// losers of the insert race wait on the same `OnceLock` rather than
117/// issuing a duplicate `ignite`. Hit path is wait-free.
118pub struct BurnCache {
119    engine: Box<dyn Combustor>,
120    compiled: HopscotchMap<[u8; 32], Arc<CompileCell>>,
121    source_store: HopscotchMap<[u8; 32], String>,
122    /// Optional cross-process / cross-node backend. Local-only builds
123    /// set this to `None` (equivalent to `InProcessCacheBackend`
124    /// behavior) so there's no hot-path branch unless the caller opted
125    /// into distributed caching.
126    backend: Option<Arc<dyn BurnCacheBackend>>,
127    stats: RegistryStats,
128}
129
130impl BurnCache {
131    pub fn new(engine: Box<dyn Combustor>) -> Self {
132        Self {
133            engine,
134            compiled: HopscotchMap::new(),
135            source_store: HopscotchMap::new(),
136            backend: None,
137            stats: RegistryStats::default(),
138        }
139    }
140
141    /// Attach a distributed cache backend. See [`BurnCacheBackend`].
142    /// When set, `register_by_hash` and `register` consult the backend
143    /// for a cache miss before treating it as a genuinely-new script.
144    pub fn with_backend(mut self, backend: Arc<dyn BurnCacheBackend>) -> Self {
145        self.backend = Some(backend);
146        self
147    }
148
149    /// Register a script when only its hash is known - the source is
150    /// fetched from the [`BurnCacheBackend`]. Returns
151    /// [`AfterburnerError::ScriptNotFound`] if no backend is attached
152    /// or the backend has no entry for `hash`.
153    ///
154    /// Useful for "worker nodes" in a distributed deployment that
155    /// receive only a script-id reference and must pull the source
156    /// from a shared store before running it.
157    pub fn register_by_hash(&self, hash: &[u8; 32]) -> Result<ScriptId> {
158        // Fast path: source already cached locally.
159        if let Some(src) = self.source_store.get(hash) {
160            return self.register(&src);
161        }
162        let backend = self
163            .backend
164            .as_ref()
165            .ok_or(AfterburnerError::ScriptNotFound)?;
166        match backend.fetch(hash)? {
167            Some(src) => self.register(&src),
168            None => Err(AfterburnerError::ScriptNotFound),
169        }
170    }
171
172    /// Compile-or-cache. Idempotent. Thread-safe. **At most one** `ignite`
173    /// per unique source across concurrent callers: the winner of the
174    /// insert race compiles, losers wait on the shared `OnceLock` and
175    /// observe the same outcome.
176    ///
177    /// The hit path (cell present, result already filled) is wait-free.
178    #[fastrace::trace(name = "BurnCache::register")]
179    pub fn register(&self, source: &str) -> Result<ScriptId> {
180        let hash = sha256(source.as_bytes());
181
182        // Fast hit: cell exists and the result is already published.
183        if let Some(cell) = self.compiled.get(&hash)
184            && let Some(outcome) = cell.result.get()
185        {
186            self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
187            ab_event!(Level::Debug, "burn_cache.hit", "hash" => hex32(&hash));
188            return outcome_to_result(outcome);
189        }
190
191        // Slow path: try to install a fresh cell. Whoever wins performs
192        // the compile; everyone else waits on the shared cell.
193        //
194        // kovan_map's `insert_if_absent` is *not* a strong CAS: the
195        // CAS-then-hop-bit window can let multiple racing inserts of
196        // the same key all return `None` (each thinking it just
197        // installed). The map then exposes the canonical (lowest-
198        // offset) entry via `get`. We follow the same pattern its own
199        // `get_or_insert` uses - install, then re-get to find the
200        // canonical entry - and decide winner via Arc pointer
201        // identity. Without this re-get, two threads could both run
202        // `engine.ignite` for the same source under contention.
203        let fresh = CompileCell::new();
204        self.compiled.insert_if_absent(hash, fresh.clone());
205        let cell = self.compiled.get(&hash).unwrap_or_else(|| fresh.clone());
206        let is_winner = Arc::ptr_eq(&cell, &fresh);
207
208        if is_winner {
209            self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
210            ab_event!(
211                Level::Info,
212                "burn_cache.miss",
213                "hash" => hex32(&hash),
214                "source_bytes" => source.len(),
215            );
216            self.source_store.insert(hash, source.to_string());
217            // Publish to the distributed backend (if attached) so peer
218            // nodes can fetch the source by hash. Publish failures are
219            // logged but don't abort registration - local compilation
220            // succeeded and we want the caller to keep working.
221            if let Some(b) = self.backend.as_ref()
222                && let Err(e) = b.publish(&hash, source)
223            {
224                ab_event!(Level::Warn, "burn_cache.publish_failed", "error" => e.to_string());
225            }
226            let stored = match self.engine.ignite(source) {
227                Ok(id) => Ok(id),
228                Err(e) => {
229                    ab_event!(Level::Warn, "burn_cache.compile_failed", "error" => e);
230                    Err(e.to_string())
231                }
232            };
233            // `set` can only fail if someone else already set - impossible
234            // because we're the sole writer via is_winner=true.
235            let _ = cell.result.set(stored.clone());
236            return match stored {
237                Ok(id) => Ok(id),
238                Err(msg) => Err(AfterburnerError::CompileFailed(msg)),
239            };
240        }
241
242        // Waiter path: spin (wait-free read) until the winner publishes.
243        self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
244        ab_event!(
245            Level::Debug,
246            "burn_cache.wait_on_peer",
247            "hash" => hex32(&hash),
248        );
249        loop {
250            if let Some(outcome) = cell.result.get() {
251                return outcome_to_result(outcome);
252            }
253            thread::yield_now();
254        }
255    }
256
257    /// Execute a compiled script. Creates an isolated invocation per
258    /// call - per-call `Store` in the WASM path, fresh interrupt handler
259    /// in the native path.
260    #[fastrace::trace(name = "BurnCache::execute")]
261    pub fn execute(&self, id: &ScriptId, input: &Value, limits: &FuelGauge) -> Result<Value> {
262        self.engine.thrust(id, input, limits)
263    }
264
265    /// Raw-input fast path. `input` reaches the module as a
266    /// `Uint8Array`; the JSON framing tax of [`execute`](Self::execute)
267    /// (host-side serialize + guest-side string materialization +
268    /// `JSON.parse`, all O(n) and fuel-metered on the guest side) is
269    /// skipped entirely. Output contract matches `execute` - the
270    /// script's return value comes back as JSON.
271    #[fastrace::trace(name = "BurnCache::execute_raw")]
272    pub fn execute_raw(&self, id: &ScriptId, input: &[u8], limits: &FuelGauge) -> Result<Value> {
273        self.engine.thrust_raw(id, input, limits)
274    }
275
276    /// Output-framing-aware execute: the module's return type picks
277    /// the result shape. See [`Combustor::thrust_out`].
278    #[fastrace::trace(name = "BurnCache::execute_out")]
279    pub fn execute_out(
280        &self,
281        id: &ScriptId,
282        input: &Value,
283        limits: &FuelGauge,
284    ) -> Result<crate::OutputValue> {
285        self.engine.thrust_out(id, input, limits)
286    }
287
288    /// Raw input + output-framing-aware result - the full-duplex bulk
289    /// path. See [`Combustor::thrust_raw_out`].
290    #[fastrace::trace(name = "BurnCache::execute_raw_out")]
291    pub fn execute_raw_out(
292        &self,
293        id: &ScriptId,
294        input: &[u8],
295        limits: &FuelGauge,
296    ) -> Result<crate::OutputValue> {
297        self.engine.thrust_raw_out(id, input, limits)
298    }
299
300    /// Run `source` as a top-level script (no UDF envelope). See
301    /// [`Combustor::run_script`] for semantics. Script-mode calls are
302    /// **not** cached - every invocation is a fresh compile + run.
303    /// Caching a script-mode script is almost never what the user
304    /// wants: Node-style scripts usually have side effects at top
305    /// level, and the host has no way to know whether a particular
306    /// re-run should re-execute those effects.
307    #[fastrace::trace(name = "BurnCache::run_script")]
308    pub fn run_script(
309        &self,
310        source: &str,
311        invocation: &crate::ScriptInvocation,
312        limits: &FuelGauge,
313    ) -> Result<crate::ScriptOutcome> {
314        self.engine.run_script(source, invocation, limits)
315    }
316
317    /// Array-in / array-out batch helper.
318    ///
319    /// Contract: `rows` must be a JSON array. The script receives the whole
320    /// array and must return an array - typically via
321    /// `module.exports = (rows) => rows.map(r => ({...}))`. The helper
322    /// enforces the shape and returns a typed error if either side
323    /// violates it.
324    #[fastrace::trace(name = "BurnCache::execute_batch")]
325    pub fn execute_batch(&self, id: &ScriptId, rows: &Value, limits: &FuelGauge) -> Result<Value> {
326        if !rows.is_array() {
327            return Err(AfterburnerError::Host(
328                "execute_batch: input must be a JSON array".into(),
329            ));
330        }
331        let out = self.engine.thrust(id, rows, limits)?;
332        if !out.is_array() {
333            return Err(AfterburnerError::Host(format!(
334                "execute_batch: script must return an array; got {}",
335                type_name(&out)
336            )));
337        }
338        Ok(out)
339    }
340
341    /// Columnar UDF entry point. Forwards the pre-encoded blob to the
342    /// underlying combustor via the trait method, which knows how to
343    /// dispatch into the wasm path. Decoding the reply blob back into
344    /// host-side `ColumnarOutput` is the caller's responsibility - the
345    /// `Afterburner::run_columnar` facade wraps this with the
346    /// `encode_batch` / `decode_batch` pair from `afterburner-wasi`.
347    #[fastrace::trace(name = "BurnCache::execute_columnar_bytes")]
348    pub fn execute_columnar_bytes(
349        &self,
350        id: &ScriptId,
351        encoded: &[u8],
352        limits: &FuelGauge,
353    ) -> Result<Vec<u8>> {
354        self.engine.thrust_columnar_bytes(id, encoded, limits)
355    }
356
357    /// Remove a compiled script from the cache. The engine's `extinguish`
358    /// is also called so backend-owned resources (wasmtime modules,
359    /// rquickjs source buffers) are released.
360    pub fn forget(&self, id: &ScriptId) {
361        self.compiled.remove(&id.hash);
362        self.source_store.remove(&id.hash);
363        self.engine.extinguish(id);
364        ab_event!(Level::Info, "burn_cache.forget", "hash" => hex32(&id.hash));
365    }
366
367    /// Retrieve the original source for a `ScriptId`, if still cached.
368    pub fn source(&self, id: &ScriptId) -> Option<String> {
369        self.source_store.get(&id.hash)
370    }
371
372    pub fn stats(&self) -> &RegistryStats {
373        &self.stats
374    }
375}
376
377/// Translate a published `CompileCell` outcome back into a `Result`.
378/// Waiters that lose the insert race see a cloned `ScriptId` on success,
379/// or a fresh `CompileFailed(msg)` carrying the winner's error text.
380fn outcome_to_result(o: &std::result::Result<ScriptId, String>) -> Result<ScriptId> {
381    match o {
382        Ok(id) => Ok(*id),
383        Err(msg) => Err(AfterburnerError::CompileFailed(msg.clone())),
384    }
385}
386
387/// Render a 32-byte hash as a 16-char hex prefix - short enough for log
388/// output, long enough to disambiguate scripts in practice.
389///
390/// This is a *display* helper, not a content address: it encodes only the
391/// first 8 bytes. Full-width content-addressing hex lives in `afterburner-afb`
392/// (`digest::hex`); do not use this for cache keys or registry paths.
393pub fn hex32(hash: &[u8; 32]) -> String {
394    let mut s = String::with_capacity(16);
395    for b in &hash[..8] {
396        s.push_str(&format!("{b:02x}"));
397    }
398    s
399}
400
401fn type_name(v: &Value) -> &'static str {
402    match v {
403        Value::Null => "null",
404        Value::Bool(_) => "boolean",
405        Value::Number(_) => "number",
406        Value::String(_) => "string",
407        Value::Array(_) => "array",
408        Value::Object(_) => "object",
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::engine::Combustor;
416    use crate::types::EngineMode;
417    use serde_json::json;
418
419    /// Minimal fake Combustor that records every `ignite` and `thrust`
420    /// call so tests can assert idempotence / delegation.
421    ///
422    /// "Last thrust input" is stored in a lock-free `HopscotchMap`
423    /// keyed by a unit sentinel (u8=0) - no Mutex in test code either.
424    #[derive(Default)]
425    struct MockCombustor {
426        ignite_count: AtomicU64,
427        thrust_count: AtomicU64,
428        last_thrust: HopscotchMap<u8, Value>,
429    }
430
431    impl Combustor for MockCombustor {
432        fn ignite(&self, source: &str) -> Result<ScriptId> {
433            self.ignite_count.fetch_add(1, Ordering::Relaxed);
434            Ok(ScriptId {
435                hash: sha256(source.as_bytes()),
436                mode: EngineMode::Native,
437            })
438        }
439        fn thrust(&self, _id: &ScriptId, input: &Value, _lim: &FuelGauge) -> Result<Value> {
440            self.thrust_count.fetch_add(1, Ordering::Relaxed);
441            self.last_thrust.insert(0u8, input.clone());
442            Ok(json!({"echo": input}))
443        }
444        fn extinguish(&self, _id: &ScriptId) {}
445    }
446
447    fn cache_with_mock() -> (BurnCache, std::sync::Arc<MockCombustor>) {
448        let mock = std::sync::Arc::new(MockCombustor::default());
449        // BurnCache takes a Box<dyn Combustor>, but we also want to peek
450        // at counters. Use an Arc-wrapped shim.
451        struct Shim(std::sync::Arc<MockCombustor>);
452        impl Combustor for Shim {
453            fn ignite(&self, s: &str) -> Result<ScriptId> {
454                self.0.ignite(s)
455            }
456            fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
457                self.0.thrust(id, i, l)
458            }
459            fn extinguish(&self, id: &ScriptId) {
460                self.0.extinguish(id)
461            }
462        }
463        (BurnCache::new(Box::new(Shim(mock.clone()))), mock)
464    }
465
466    #[test]
467    fn register_is_idempotent() {
468        let (cache, mock) = cache_with_mock();
469        let id1 = cache.register("module.exports = () => 1").unwrap();
470        let id2 = cache.register("module.exports = () => 1").unwrap();
471        assert_eq!(id1.hash, id2.hash);
472        assert_eq!(mock.ignite_count.load(Ordering::Relaxed), 1);
473        assert_eq!(cache.stats().hits(), 1);
474        assert_eq!(cache.stats().misses(), 1);
475    }
476
477    #[test]
478    fn different_sources_compile_separately() {
479        let (cache, mock) = cache_with_mock();
480        cache.register("module.exports = () => 1").unwrap();
481        cache.register("module.exports = () => 2").unwrap();
482        assert_eq!(mock.ignite_count.load(Ordering::Relaxed), 2);
483        assert_eq!(cache.stats().misses(), 2);
484    }
485
486    #[test]
487    fn execute_delegates_to_engine() {
488        let (cache, mock) = cache_with_mock();
489        let id = cache.register("module.exports = () => 1").unwrap();
490        let out = cache
491            .execute(&id, &json!({"x": 7}), &FuelGauge::unlimited())
492            .unwrap();
493        assert_eq!(out, json!({"echo": {"x": 7}}));
494        assert_eq!(mock.thrust_count.load(Ordering::Relaxed), 1);
495    }
496
497    #[test]
498    fn forget_removes_from_cache() {
499        let (cache, _mock) = cache_with_mock();
500        let id = cache.register("module.exports = () => 1").unwrap();
501        assert!(cache.source(&id).is_some());
502        cache.forget(&id);
503        assert!(cache.source(&id).is_none());
504    }
505
506    /// Build two BurnCache instances around independent MockCombustors
507    /// but sharing a single BurnCacheBackend. Simulates a two-node
508    /// cluster with a distributed source store.
509    fn shared_backend_pair() -> (
510        BurnCache,
511        BurnCache,
512        std::sync::Arc<MockCombustor>,
513        std::sync::Arc<MockCombustor>,
514        std::sync::Arc<InProcessCacheBackend>,
515    ) {
516        let mock_a = std::sync::Arc::new(MockCombustor::default());
517        let mock_b = std::sync::Arc::new(MockCombustor::default());
518        struct Shim(std::sync::Arc<MockCombustor>);
519        impl Combustor for Shim {
520            fn ignite(&self, s: &str) -> Result<ScriptId> {
521                self.0.ignite(s)
522            }
523            fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
524                self.0.thrust(id, i, l)
525            }
526            fn extinguish(&self, id: &ScriptId) {
527                self.0.extinguish(id)
528            }
529        }
530        let backend = InProcessCacheBackend::shared();
531        let cache_a = BurnCache::new(Box::new(Shim(mock_a.clone())))
532            .with_backend(backend.clone() as std::sync::Arc<dyn BurnCacheBackend>);
533        let cache_b = BurnCache::new(Box::new(Shim(mock_b.clone())))
534            .with_backend(backend.clone() as std::sync::Arc<dyn BurnCacheBackend>);
535        (cache_a, cache_b, mock_a, mock_b, backend)
536    }
537
538    #[test]
539    fn register_publishes_to_backend() {
540        let (cache_a, _cache_b, _mock_a, _mock_b, backend) = shared_backend_pair();
541        let id = cache_a.register("module.exports = () => 99").unwrap();
542        // Backend got the source at the same hash.
543        let fetched = backend.fetch(&id.hash).unwrap();
544        assert_eq!(fetched.as_deref(), Some("module.exports = () => 99"));
545    }
546
547    #[test]
548    fn register_by_hash_resolves_via_shared_backend() {
549        // Node A registers a source. Node B knows only the hash and
550        // asks BurnCache to materialize it. The shared backend
551        // supplies the source; Node B still compiles locally (each
552        // engine keeps its own compile state - source distribution is
553        // what the backend gives us, not compiled modules).
554        let (cache_a, cache_b, _mock_a, mock_b, _backend) = shared_backend_pair();
555        let id_a = cache_a.register("module.exports = (d) => d + 1").unwrap();
556        // Node B: only knows the hash.
557        let id_b = cache_b.register_by_hash(&id_a.hash).unwrap();
558        assert_eq!(id_a.hash, id_b.hash);
559        // Node B compiled exactly once - its own mock shows one ignite.
560        assert_eq!(mock_b.ignite_count.load(Ordering::Relaxed), 1);
561    }
562
563    #[test]
564    fn register_by_hash_without_backend_is_not_found() {
565        let (cache, _mock) = cache_with_mock();
566        // No backend attached; hash is bogus, source isn't locally
567        // known - should surface ScriptNotFound, not a panic.
568        let err = cache.register_by_hash(&[0xab; 32]).unwrap_err();
569        assert!(
570            matches!(err, AfterburnerError::ScriptNotFound),
571            "got: {err:?}"
572        );
573    }
574
575    #[test]
576    fn register_by_hash_prefers_local_source_over_backend() {
577        // If the source is already cached locally, register_by_hash
578        // must not phone home. We enforce this via a test backend
579        // whose fetch panics.
580        struct LoudBackend;
581        impl BurnCacheBackend for LoudBackend {
582            fn fetch(&self, _: &[u8; 32]) -> Result<Option<String>> {
583                panic!("backend.fetch should not be called on a local hit");
584            }
585            fn publish(&self, _: &[u8; 32], _: &str) -> Result<()> {
586                Ok(())
587            }
588        }
589        let mock = std::sync::Arc::new(MockCombustor::default());
590        struct Shim(std::sync::Arc<MockCombustor>);
591        impl Combustor for Shim {
592            fn ignite(&self, s: &str) -> Result<ScriptId> {
593                self.0.ignite(s)
594            }
595            fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
596                self.0.thrust(id, i, l)
597            }
598            fn extinguish(&self, id: &ScriptId) {
599                self.0.extinguish(id)
600            }
601        }
602        let cache = BurnCache::new(Box::new(Shim(mock.clone())))
603            .with_backend(std::sync::Arc::new(LoudBackend));
604        let id = cache.register("module.exports = () => 7").unwrap();
605        // This call must short-circuit on source_store, not hit the backend.
606        let id2 = cache.register_by_hash(&id.hash).unwrap();
607        assert_eq!(id.hash, id2.hash);
608    }
609
610    #[test]
611    fn execute_batch_rejects_non_array_input() {
612        let (cache, _) = cache_with_mock();
613        let id = cache.register("module.exports = (r) => r").unwrap();
614        let err = cache
615            .execute_batch(&id, &json!({"x": 1}), &FuelGauge::unlimited())
616            .unwrap_err();
617        match err {
618            crate::AfterburnerError::Host(m) => {
619                assert!(m.contains("must be a JSON array"), "got: {m}");
620            }
621            other => panic!("expected Host error; got {other:?}"),
622        }
623    }
624
625    #[test]
626    fn execute_batch_rejects_non_array_output() {
627        // MockCombustor echoes input inside an object. Feeding an
628        // array therefore yields {"echo": [...]} - not an array, so
629        // execute_batch must reject.
630        let (cache, _) = cache_with_mock();
631        let id = cache.register("module.exports = (r) => r").unwrap();
632        let err = cache
633            .execute_batch(&id, &json!([{"n": 1}]), &FuelGauge::unlimited())
634            .unwrap_err();
635        match err {
636            crate::AfterburnerError::Host(m) => {
637                assert!(m.contains("must return an array"), "got: {m}");
638            }
639            other => panic!("expected Host error; got {other:?}"),
640        }
641    }
642
643    #[test]
644    fn concurrent_register_compiles_exactly_once_per_source() {
645        // With the `OnceLock`-gated `CompileCell`, concurrent registrations
646        // of the same source fan into a single `ignite`; losers of the
647        // insert race wait on the shared cell and observe the same id.
648        use std::thread;
649        let (cache, mock) = cache_with_mock();
650        let cache = std::sync::Arc::new(cache);
651        let mut handles = Vec::new();
652        for _ in 0..16 {
653            let c = cache.clone();
654            handles.push(thread::spawn(move || {
655                c.register("module.exports = () => 42").unwrap()
656            }));
657        }
658        let ids: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
659        assert!(ids.windows(2).all(|w| w[0].hash == w[1].hash));
660        assert_eq!(
661            mock.ignite_count.load(Ordering::Relaxed),
662            1,
663            "OnceLock dedup must collapse N concurrent registers into 1 ignite"
664        );
665    }
666}