afterburner_core/state_store.rs
1// SPDX-License-Identifier: BUSL-1.1
2// Copyright (c) 2026 vertexclique
3// Licensed under the Business Source License 1.1.
4// Change Date: 4 years after this version's release. Change License: Apache-2.0.
5
6//! Cross-invocation key/value persistence - the `StateStore` trait.
7//!
8//! Scripts running in Afterburner are otherwise stateless: every thrust
9//! gets a fresh JS context. `StateStore` plugs in a small KV the script
10//! can read/write across calls (counters, deduplication caches, last-seen
11//! cursors, …). The default backend is in-memory and lives for the
12//! lifetime of the engine; embedders can drop in a Redis/SQLite/etc.
13//! implementation by depending on the trait.
14//!
15//! Capability gating: `Manifold` does not gate `StateStore` - the store
16//! is supplied by the host explicitly, so its presence and scope are
17//! deliberate. If you want to deny state access, install a no-op store.
18
19use kovan_map::HopscotchMap;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicI64, Ordering};
22
23/// Pluggable cross-invocation key/value storage.
24///
25/// All operations are synchronous because the JS API
26/// (`require('afterburner:state').get(key)`) is sync. Implementations
27/// must be `Send + Sync` to support concurrent thrusts.
28///
29/// `increment_i64` is a required primitive, not a convenience helper -
30/// the JS-side `state.increment` counter would race without a single
31/// host call. Implementations MUST apply the delta atomically
32/// relative to other concurrent callers.
33pub trait StateStore: Send + Sync {
34 fn get(&self, key: &str) -> Option<Vec<u8>>;
35 fn set(&self, key: &str, value: Vec<u8>);
36 fn delete(&self, key: &str);
37 /// Atomically add `delta` to the signed integer stored under `key`
38 /// (or 0 if absent) and return the new value. Implementations must
39 /// ensure no reader sees a partial update under concurrent access.
40 fn increment_i64(&self, key: &str, delta: i64) -> i64;
41 /// Best-effort prefix listing. The default in-memory backend
42 /// returns an empty vec - embedders that need iteration should
43 /// plug in a backend whose storage supports it.
44 fn list_keys(&self, _prefix: &str) -> Vec<String> {
45 Vec::new()
46 }
47}
48
49/// Convenience shared handle. The store is reference-counted and
50/// `Arc<dyn StateStore>` is what gets stashed in `WasmCombustor` /
51/// thread-local activator.
52pub type SharedStateStore = Arc<dyn StateStore>;
53
54/// Default in-process backend backed by a lock-free `HopscotchMap`.
55/// Suitable for single-process deployments; not durable across restarts.
56///
57/// Integer counters get their own `HopscotchMap<String, Arc<AtomicI64>>`
58/// so `increment_i64` can CAS atomically without RMW-racing through the
59/// byte-keyed bucket. Readers of the byte store see the counter value
60/// as decimal ASCII on `get(key)` after an increment - this mirrors the
61/// JS polyfill's `setJSON`/`getJSON` contract.
62#[derive(Default)]
63pub struct InMemoryStateStore {
64 bytes: HopscotchMap<String, Vec<u8>>,
65 counters: HopscotchMap<String, Arc<AtomicI64>>,
66}
67
68impl InMemoryStateStore {
69 pub fn new() -> Self {
70 Self::default()
71 }
72 pub fn shared() -> SharedStateStore {
73 Arc::new(Self::new())
74 }
75}
76
77impl StateStore for InMemoryStateStore {
78 fn get(&self, key: &str) -> Option<Vec<u8>> {
79 // Prefer the counter value so `increment` is observable.
80 if let Some(counter) = self.counters.get(key) {
81 return Some(counter.load(Ordering::Acquire).to_string().into_bytes());
82 }
83 self.bytes.get(key)
84 }
85 fn set(&self, key: &str, value: Vec<u8>) {
86 // Writing a fresh value clears any counter at the same key so
87 // `set` then `get` returns what was written.
88 self.counters.remove(key);
89 self.bytes.insert(key.to_string(), value);
90 }
91 fn delete(&self, key: &str) {
92 self.bytes.remove(key);
93 self.counters.remove(key);
94 }
95 fn increment_i64(&self, key: &str, delta: i64) -> i64 {
96 // Fast path: counter already exists.
97 if let Some(counter) = self.counters.get(key) {
98 return counter.fetch_add(delta, Ordering::AcqRel) + delta;
99 }
100 // Slow path: seed from any existing bytes value (parsed as
101 // decimal) then install the counter. Concurrent initializers
102 // race on `insert_if_absent` - the winner's counter survives.
103 let seed = self
104 .bytes
105 .get(key)
106 .and_then(|v| {
107 std::str::from_utf8(&v)
108 .ok()
109 .and_then(|s| s.parse::<i64>().ok())
110 })
111 .unwrap_or(0);
112 let fresh = Arc::new(AtomicI64::new(seed));
113 let counter = match self
114 .counters
115 .insert_if_absent(key.to_string(), fresh.clone())
116 {
117 None => fresh,
118 Some(existing) => existing,
119 };
120 counter.fetch_add(delta, Ordering::AcqRel) + delta
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn in_memory_get_set_delete() {
130 let s = InMemoryStateStore::new();
131 assert!(s.get("k").is_none());
132 s.set("k", b"v1".to_vec());
133 assert_eq!(s.get("k"), Some(b"v1".to_vec()));
134 s.set("k", b"v2".to_vec());
135 assert_eq!(s.get("k"), Some(b"v2".to_vec()));
136 s.delete("k");
137 assert!(s.get("k").is_none());
138 }
139
140 #[test]
141 fn increment_is_atomic_under_concurrency() {
142 use std::thread;
143 let store = InMemoryStateStore::shared();
144 let mut handles = Vec::new();
145 for _ in 0..16 {
146 let s = store.clone();
147 handles.push(thread::spawn(move || {
148 for _ in 0..1000 {
149 s.increment_i64("hits", 1);
150 }
151 }));
152 }
153 for h in handles {
154 h.join().unwrap();
155 }
156 let final_val: i64 = std::str::from_utf8(&store.get("hits").unwrap())
157 .unwrap()
158 .parse()
159 .unwrap();
160 assert_eq!(final_val, 16_000);
161 }
162
163 #[test]
164 fn increment_seeds_from_set_value() {
165 let s = InMemoryStateStore::new();
166 s.set("k", b"5".to_vec());
167 assert_eq!(s.increment_i64("k", 3), 8);
168 assert_eq!(s.get("k"), Some(b"8".to_vec()));
169 }
170
171 #[test]
172 fn set_after_increment_clears_counter() {
173 let s = InMemoryStateStore::new();
174 s.increment_i64("k", 10);
175 s.set("k", b"fresh".to_vec());
176 assert_eq!(s.get("k"), Some(b"fresh".to_vec()));
177 }
178}