1use crate::ab_event;
11use crate::engine::Combustor;
12use crate::error::{AfterburnerError, Result};
13use crate::log::Level;
14use crate::types::{FuelGauge, ScriptId, sha256};
15use kovan_map::HopscotchMap;
16use serde_json::Value;
17use std::sync::Arc;
18use std::sync::OnceLock;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::thread;
21
22pub trait BurnCacheBackend: Send + Sync {
33 fn fetch(&self, hash: &[u8; 32]) -> Result<Option<String>>;
39
40 fn publish(&self, hash: &[u8; 32], source: &str) -> Result<()>;
45}
46
47#[derive(Default)]
50pub struct InProcessCacheBackend {
51 store: HopscotchMap<[u8; 32], String>,
52}
53
54impl InProcessCacheBackend {
55 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn shared() -> Arc<Self> {
60 Arc::new(Self::new())
61 }
62}
63
64impl BurnCacheBackend for InProcessCacheBackend {
65 fn fetch(&self, hash: &[u8; 32]) -> Result<Option<String>> {
66 Ok(self.store.get(hash))
67 }
68
69 fn publish(&self, hash: &[u8; 32], source: &str) -> Result<()> {
70 self.store.insert(*hash, source.to_string());
71 Ok(())
72 }
73}
74
75#[derive(Default)]
78pub struct RegistryStats {
79 pub cache_hits: AtomicU64,
80 pub cache_misses: AtomicU64,
81}
82
83impl RegistryStats {
84 pub fn hits(&self) -> u64 {
85 self.cache_hits.load(Ordering::Relaxed)
86 }
87 pub fn misses(&self) -> u64 {
88 self.cache_misses.load(Ordering::Relaxed)
89 }
90}
91
92struct CompileCell {
99 result: OnceLock<std::result::Result<ScriptId, String>>,
104}
105
106impl CompileCell {
107 fn new() -> Arc<Self> {
108 Arc::new(Self {
109 result: OnceLock::new(),
110 })
111 }
112}
113
114pub struct BurnCache {
119 engine: Box<dyn Combustor>,
120 compiled: HopscotchMap<[u8; 32], Arc<CompileCell>>,
121 source_store: HopscotchMap<[u8; 32], String>,
122 backend: Option<Arc<dyn BurnCacheBackend>>,
127 stats: RegistryStats,
128}
129
130impl BurnCache {
131 pub fn new(engine: Box<dyn Combustor>) -> Self {
132 Self {
133 engine,
134 compiled: HopscotchMap::new(),
135 source_store: HopscotchMap::new(),
136 backend: None,
137 stats: RegistryStats::default(),
138 }
139 }
140
141 pub fn with_backend(mut self, backend: Arc<dyn BurnCacheBackend>) -> Self {
145 self.backend = Some(backend);
146 self
147 }
148
149 pub fn register_by_hash(&self, hash: &[u8; 32]) -> Result<ScriptId> {
158 if let Some(src) = self.source_store.get(hash) {
160 return self.register(&src);
161 }
162 let backend = self
163 .backend
164 .as_ref()
165 .ok_or(AfterburnerError::ScriptNotFound)?;
166 match backend.fetch(hash)? {
167 Some(src) => self.register(&src),
168 None => Err(AfterburnerError::ScriptNotFound),
169 }
170 }
171
172 #[fastrace::trace(name = "BurnCache::register")]
179 pub fn register(&self, source: &str) -> Result<ScriptId> {
180 let hash = sha256(source.as_bytes());
181
182 if let Some(cell) = self.compiled.get(&hash)
184 && let Some(outcome) = cell.result.get()
185 {
186 self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
187 ab_event!(Level::Debug, "burn_cache.hit", "hash" => hex32(&hash));
188 return outcome_to_result(outcome);
189 }
190
191 let fresh = CompileCell::new();
204 self.compiled.insert_if_absent(hash, fresh.clone());
205 let cell = self.compiled.get(&hash).unwrap_or_else(|| fresh.clone());
206 let is_winner = Arc::ptr_eq(&cell, &fresh);
207
208 if is_winner {
209 self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
210 ab_event!(
211 Level::Info,
212 "burn_cache.miss",
213 "hash" => hex32(&hash),
214 "source_bytes" => source.len(),
215 );
216 self.source_store.insert(hash, source.to_string());
217 if let Some(b) = self.backend.as_ref()
222 && let Err(e) = b.publish(&hash, source)
223 {
224 ab_event!(Level::Warn, "burn_cache.publish_failed", "error" => e.to_string());
225 }
226 let stored = match self.engine.ignite(source) {
227 Ok(id) => Ok(id),
228 Err(e) => {
229 ab_event!(Level::Warn, "burn_cache.compile_failed", "error" => e);
230 Err(e.to_string())
231 }
232 };
233 let _ = cell.result.set(stored.clone());
236 return match stored {
237 Ok(id) => Ok(id),
238 Err(msg) => Err(AfterburnerError::CompileFailed(msg)),
239 };
240 }
241
242 self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
244 ab_event!(
245 Level::Debug,
246 "burn_cache.wait_on_peer",
247 "hash" => hex32(&hash),
248 );
249 loop {
250 if let Some(outcome) = cell.result.get() {
251 return outcome_to_result(outcome);
252 }
253 thread::yield_now();
254 }
255 }
256
257 #[fastrace::trace(name = "BurnCache::execute")]
261 pub fn execute(&self, id: &ScriptId, input: &Value, limits: &FuelGauge) -> Result<Value> {
262 self.engine.thrust(id, input, limits)
263 }
264
265 #[fastrace::trace(name = "BurnCache::execute_raw")]
272 pub fn execute_raw(&self, id: &ScriptId, input: &[u8], limits: &FuelGauge) -> Result<Value> {
273 self.engine.thrust_raw(id, input, limits)
274 }
275
276 #[fastrace::trace(name = "BurnCache::execute_out")]
279 pub fn execute_out(
280 &self,
281 id: &ScriptId,
282 input: &Value,
283 limits: &FuelGauge,
284 ) -> Result<crate::OutputValue> {
285 self.engine.thrust_out(id, input, limits)
286 }
287
288 #[fastrace::trace(name = "BurnCache::execute_raw_out")]
291 pub fn execute_raw_out(
292 &self,
293 id: &ScriptId,
294 input: &[u8],
295 limits: &FuelGauge,
296 ) -> Result<crate::OutputValue> {
297 self.engine.thrust_raw_out(id, input, limits)
298 }
299
300 #[fastrace::trace(name = "BurnCache::run_script")]
308 pub fn run_script(
309 &self,
310 source: &str,
311 invocation: &crate::ScriptInvocation,
312 limits: &FuelGauge,
313 ) -> Result<crate::ScriptOutcome> {
314 self.engine.run_script(source, invocation, limits)
315 }
316
317 #[fastrace::trace(name = "BurnCache::execute_batch")]
325 pub fn execute_batch(&self, id: &ScriptId, rows: &Value, limits: &FuelGauge) -> Result<Value> {
326 if !rows.is_array() {
327 return Err(AfterburnerError::Host(
328 "execute_batch: input must be a JSON array".into(),
329 ));
330 }
331 let out = self.engine.thrust(id, rows, limits)?;
332 if !out.is_array() {
333 return Err(AfterburnerError::Host(format!(
334 "execute_batch: script must return an array; got {}",
335 type_name(&out)
336 )));
337 }
338 Ok(out)
339 }
340
341 #[fastrace::trace(name = "BurnCache::execute_columnar_bytes")]
348 pub fn execute_columnar_bytes(
349 &self,
350 id: &ScriptId,
351 encoded: &[u8],
352 limits: &FuelGauge,
353 ) -> Result<Vec<u8>> {
354 self.engine.thrust_columnar_bytes(id, encoded, limits)
355 }
356
357 pub fn forget(&self, id: &ScriptId) {
361 self.compiled.remove(&id.hash);
362 self.source_store.remove(&id.hash);
363 self.engine.extinguish(id);
364 ab_event!(Level::Info, "burn_cache.forget", "hash" => hex32(&id.hash));
365 }
366
367 pub fn source(&self, id: &ScriptId) -> Option<String> {
369 self.source_store.get(&id.hash)
370 }
371
372 pub fn stats(&self) -> &RegistryStats {
373 &self.stats
374 }
375}
376
377fn outcome_to_result(o: &std::result::Result<ScriptId, String>) -> Result<ScriptId> {
381 match o {
382 Ok(id) => Ok(*id),
383 Err(msg) => Err(AfterburnerError::CompileFailed(msg.clone())),
384 }
385}
386
387pub fn hex32(hash: &[u8; 32]) -> String {
394 let mut s = String::with_capacity(16);
395 for b in &hash[..8] {
396 s.push_str(&format!("{b:02x}"));
397 }
398 s
399}
400
401fn type_name(v: &Value) -> &'static str {
402 match v {
403 Value::Null => "null",
404 Value::Bool(_) => "boolean",
405 Value::Number(_) => "number",
406 Value::String(_) => "string",
407 Value::Array(_) => "array",
408 Value::Object(_) => "object",
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::engine::Combustor;
416 use crate::types::EngineMode;
417 use serde_json::json;
418
419 #[derive(Default)]
425 struct MockCombustor {
426 ignite_count: AtomicU64,
427 thrust_count: AtomicU64,
428 last_thrust: HopscotchMap<u8, Value>,
429 }
430
431 impl Combustor for MockCombustor {
432 fn ignite(&self, source: &str) -> Result<ScriptId> {
433 self.ignite_count.fetch_add(1, Ordering::Relaxed);
434 Ok(ScriptId {
435 hash: sha256(source.as_bytes()),
436 mode: EngineMode::Native,
437 })
438 }
439 fn thrust(&self, _id: &ScriptId, input: &Value, _lim: &FuelGauge) -> Result<Value> {
440 self.thrust_count.fetch_add(1, Ordering::Relaxed);
441 self.last_thrust.insert(0u8, input.clone());
442 Ok(json!({"echo": input}))
443 }
444 fn extinguish(&self, _id: &ScriptId) {}
445 }
446
447 fn cache_with_mock() -> (BurnCache, std::sync::Arc<MockCombustor>) {
448 let mock = std::sync::Arc::new(MockCombustor::default());
449 struct Shim(std::sync::Arc<MockCombustor>);
452 impl Combustor for Shim {
453 fn ignite(&self, s: &str) -> Result<ScriptId> {
454 self.0.ignite(s)
455 }
456 fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
457 self.0.thrust(id, i, l)
458 }
459 fn extinguish(&self, id: &ScriptId) {
460 self.0.extinguish(id)
461 }
462 }
463 (BurnCache::new(Box::new(Shim(mock.clone()))), mock)
464 }
465
466 #[test]
467 fn register_is_idempotent() {
468 let (cache, mock) = cache_with_mock();
469 let id1 = cache.register("module.exports = () => 1").unwrap();
470 let id2 = cache.register("module.exports = () => 1").unwrap();
471 assert_eq!(id1.hash, id2.hash);
472 assert_eq!(mock.ignite_count.load(Ordering::Relaxed), 1);
473 assert_eq!(cache.stats().hits(), 1);
474 assert_eq!(cache.stats().misses(), 1);
475 }
476
477 #[test]
478 fn different_sources_compile_separately() {
479 let (cache, mock) = cache_with_mock();
480 cache.register("module.exports = () => 1").unwrap();
481 cache.register("module.exports = () => 2").unwrap();
482 assert_eq!(mock.ignite_count.load(Ordering::Relaxed), 2);
483 assert_eq!(cache.stats().misses(), 2);
484 }
485
486 #[test]
487 fn execute_delegates_to_engine() {
488 let (cache, mock) = cache_with_mock();
489 let id = cache.register("module.exports = () => 1").unwrap();
490 let out = cache
491 .execute(&id, &json!({"x": 7}), &FuelGauge::unlimited())
492 .unwrap();
493 assert_eq!(out, json!({"echo": {"x": 7}}));
494 assert_eq!(mock.thrust_count.load(Ordering::Relaxed), 1);
495 }
496
497 #[test]
498 fn forget_removes_from_cache() {
499 let (cache, _mock) = cache_with_mock();
500 let id = cache.register("module.exports = () => 1").unwrap();
501 assert!(cache.source(&id).is_some());
502 cache.forget(&id);
503 assert!(cache.source(&id).is_none());
504 }
505
506 fn shared_backend_pair() -> (
510 BurnCache,
511 BurnCache,
512 std::sync::Arc<MockCombustor>,
513 std::sync::Arc<MockCombustor>,
514 std::sync::Arc<InProcessCacheBackend>,
515 ) {
516 let mock_a = std::sync::Arc::new(MockCombustor::default());
517 let mock_b = std::sync::Arc::new(MockCombustor::default());
518 struct Shim(std::sync::Arc<MockCombustor>);
519 impl Combustor for Shim {
520 fn ignite(&self, s: &str) -> Result<ScriptId> {
521 self.0.ignite(s)
522 }
523 fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
524 self.0.thrust(id, i, l)
525 }
526 fn extinguish(&self, id: &ScriptId) {
527 self.0.extinguish(id)
528 }
529 }
530 let backend = InProcessCacheBackend::shared();
531 let cache_a = BurnCache::new(Box::new(Shim(mock_a.clone())))
532 .with_backend(backend.clone() as std::sync::Arc<dyn BurnCacheBackend>);
533 let cache_b = BurnCache::new(Box::new(Shim(mock_b.clone())))
534 .with_backend(backend.clone() as std::sync::Arc<dyn BurnCacheBackend>);
535 (cache_a, cache_b, mock_a, mock_b, backend)
536 }
537
538 #[test]
539 fn register_publishes_to_backend() {
540 let (cache_a, _cache_b, _mock_a, _mock_b, backend) = shared_backend_pair();
541 let id = cache_a.register("module.exports = () => 99").unwrap();
542 let fetched = backend.fetch(&id.hash).unwrap();
544 assert_eq!(fetched.as_deref(), Some("module.exports = () => 99"));
545 }
546
547 #[test]
548 fn register_by_hash_resolves_via_shared_backend() {
549 let (cache_a, cache_b, _mock_a, mock_b, _backend) = shared_backend_pair();
555 let id_a = cache_a.register("module.exports = (d) => d + 1").unwrap();
556 let id_b = cache_b.register_by_hash(&id_a.hash).unwrap();
558 assert_eq!(id_a.hash, id_b.hash);
559 assert_eq!(mock_b.ignite_count.load(Ordering::Relaxed), 1);
561 }
562
563 #[test]
564 fn register_by_hash_without_backend_is_not_found() {
565 let (cache, _mock) = cache_with_mock();
566 let err = cache.register_by_hash(&[0xab; 32]).unwrap_err();
569 assert!(
570 matches!(err, AfterburnerError::ScriptNotFound),
571 "got: {err:?}"
572 );
573 }
574
575 #[test]
576 fn register_by_hash_prefers_local_source_over_backend() {
577 struct LoudBackend;
581 impl BurnCacheBackend for LoudBackend {
582 fn fetch(&self, _: &[u8; 32]) -> Result<Option<String>> {
583 panic!("backend.fetch should not be called on a local hit");
584 }
585 fn publish(&self, _: &[u8; 32], _: &str) -> Result<()> {
586 Ok(())
587 }
588 }
589 let mock = std::sync::Arc::new(MockCombustor::default());
590 struct Shim(std::sync::Arc<MockCombustor>);
591 impl Combustor for Shim {
592 fn ignite(&self, s: &str) -> Result<ScriptId> {
593 self.0.ignite(s)
594 }
595 fn thrust(&self, id: &ScriptId, i: &Value, l: &FuelGauge) -> Result<Value> {
596 self.0.thrust(id, i, l)
597 }
598 fn extinguish(&self, id: &ScriptId) {
599 self.0.extinguish(id)
600 }
601 }
602 let cache = BurnCache::new(Box::new(Shim(mock.clone())))
603 .with_backend(std::sync::Arc::new(LoudBackend));
604 let id = cache.register("module.exports = () => 7").unwrap();
605 let id2 = cache.register_by_hash(&id.hash).unwrap();
607 assert_eq!(id.hash, id2.hash);
608 }
609
610 #[test]
611 fn execute_batch_rejects_non_array_input() {
612 let (cache, _) = cache_with_mock();
613 let id = cache.register("module.exports = (r) => r").unwrap();
614 let err = cache
615 .execute_batch(&id, &json!({"x": 1}), &FuelGauge::unlimited())
616 .unwrap_err();
617 match err {
618 crate::AfterburnerError::Host(m) => {
619 assert!(m.contains("must be a JSON array"), "got: {m}");
620 }
621 other => panic!("expected Host error; got {other:?}"),
622 }
623 }
624
625 #[test]
626 fn execute_batch_rejects_non_array_output() {
627 let (cache, _) = cache_with_mock();
631 let id = cache.register("module.exports = (r) => r").unwrap();
632 let err = cache
633 .execute_batch(&id, &json!([{"n": 1}]), &FuelGauge::unlimited())
634 .unwrap_err();
635 match err {
636 crate::AfterburnerError::Host(m) => {
637 assert!(m.contains("must return an array"), "got: {m}");
638 }
639 other => panic!("expected Host error; got {other:?}"),
640 }
641 }
642
643 #[test]
644 fn concurrent_register_compiles_exactly_once_per_source() {
645 use std::thread;
649 let (cache, mock) = cache_with_mock();
650 let cache = std::sync::Arc::new(cache);
651 let mut handles = Vec::new();
652 for _ in 0..16 {
653 let c = cache.clone();
654 handles.push(thread::spawn(move || {
655 c.register("module.exports = () => 42").unwrap()
656 }));
657 }
658 let ids: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
659 assert!(ids.windows(2).all(|w| w[0].hash == w[1].hash));
660 assert_eq!(
661 mock.ignite_count.load(Ordering::Relaxed),
662 1,
663 "OnceLock dedup must collapse N concurrent registers into 1 ignite"
664 );
665 }
666}