Skip to main content

afterburner_core/
state_store.rs

1// SPDX-License-Identifier: BUSL-1.1
2// Copyright (c) 2026 vertexclique
3// Licensed under the Business Source License 1.1.
4// Change Date: 4 years after this version's release. Change License: Apache-2.0.
5
6//! Cross-invocation key/value persistence - the `StateStore` trait.
7//!
8//! Scripts running in Afterburner are otherwise stateless: every thrust
9//! gets a fresh JS context. `StateStore` plugs in a small KV the script
10//! can read/write across calls (counters, deduplication caches, last-seen
11//! cursors, …). The default backend is in-memory and lives for the
12//! lifetime of the engine; embedders can drop in a Redis/SQLite/etc.
13//! implementation by depending on the trait.
14//!
15//! Capability gating: `Manifold` does not gate `StateStore` - the store
16//! is supplied by the host explicitly, so its presence and scope are
17//! deliberate. If you want to deny state access, install a no-op store.
18
19use kovan_map::HopscotchMap;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicI64, Ordering};
22
23/// Pluggable cross-invocation key/value storage.
24///
25/// All operations are synchronous because the JS API
26/// (`require('afterburner:state').get(key)`) is sync. Implementations
27/// must be `Send + Sync` to support concurrent thrusts.
28///
29/// `increment_i64` is a required primitive, not a convenience helper -
30/// the JS-side `state.increment` counter would race without a single
31/// host call. Implementations MUST apply the delta atomically
32/// relative to other concurrent callers.
33pub trait StateStore: Send + Sync {
34    fn get(&self, key: &str) -> Option<Vec<u8>>;
35    fn set(&self, key: &str, value: Vec<u8>);
36    fn delete(&self, key: &str);
37    /// Atomically add `delta` to the signed integer stored under `key`
38    /// (or 0 if absent) and return the new value. Implementations must
39    /// ensure no reader sees a partial update under concurrent access.
40    fn increment_i64(&self, key: &str, delta: i64) -> i64;
41    /// Best-effort prefix listing. The default in-memory backend
42    /// returns an empty vec - embedders that need iteration should
43    /// plug in a backend whose storage supports it.
44    fn list_keys(&self, _prefix: &str) -> Vec<String> {
45        Vec::new()
46    }
47}
48
49/// Convenience shared handle. The store is reference-counted and
50/// `Arc<dyn StateStore>` is what gets stashed in `WasmCombustor` /
51/// thread-local activator.
52pub type SharedStateStore = Arc<dyn StateStore>;
53
54/// Default in-process backend backed by a lock-free `HopscotchMap`.
55/// Suitable for single-process deployments; not durable across restarts.
56///
57/// Integer counters get their own `HopscotchMap<String, Arc<AtomicI64>>`
58/// so `increment_i64` can CAS atomically without RMW-racing through the
59/// byte-keyed bucket. Readers of the byte store see the counter value
60/// as decimal ASCII on `get(key)` after an increment - this mirrors the
61/// JS polyfill's `setJSON`/`getJSON` contract.
62#[derive(Default)]
63pub struct InMemoryStateStore {
64    bytes: HopscotchMap<String, Vec<u8>>,
65    counters: HopscotchMap<String, Arc<AtomicI64>>,
66}
67
68impl InMemoryStateStore {
69    pub fn new() -> Self {
70        Self::default()
71    }
72    pub fn shared() -> SharedStateStore {
73        Arc::new(Self::new())
74    }
75}
76
77impl StateStore for InMemoryStateStore {
78    fn get(&self, key: &str) -> Option<Vec<u8>> {
79        // Prefer the counter value so `increment` is observable.
80        if let Some(counter) = self.counters.get(key) {
81            return Some(counter.load(Ordering::Acquire).to_string().into_bytes());
82        }
83        self.bytes.get(key)
84    }
85    fn set(&self, key: &str, value: Vec<u8>) {
86        // Writing a fresh value clears any counter at the same key so
87        // `set` then `get` returns what was written.
88        self.counters.remove(key);
89        self.bytes.insert(key.to_string(), value);
90    }
91    fn delete(&self, key: &str) {
92        self.bytes.remove(key);
93        self.counters.remove(key);
94    }
95    fn increment_i64(&self, key: &str, delta: i64) -> i64 {
96        // Fast path: counter already exists.
97        if let Some(counter) = self.counters.get(key) {
98            return counter.fetch_add(delta, Ordering::AcqRel) + delta;
99        }
100        // Slow path: seed from any existing bytes value (parsed as
101        // decimal) then install the counter. Concurrent initializers
102        // race on `insert_if_absent` - the winner's counter survives.
103        let seed = self
104            .bytes
105            .get(key)
106            .and_then(|v| {
107                std::str::from_utf8(&v)
108                    .ok()
109                    .and_then(|s| s.parse::<i64>().ok())
110            })
111            .unwrap_or(0);
112        let fresh = Arc::new(AtomicI64::new(seed));
113        let counter = match self
114            .counters
115            .insert_if_absent(key.to_string(), fresh.clone())
116        {
117            None => fresh,
118            Some(existing) => existing,
119        };
120        counter.fetch_add(delta, Ordering::AcqRel) + delta
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn in_memory_get_set_delete() {
130        let s = InMemoryStateStore::new();
131        assert!(s.get("k").is_none());
132        s.set("k", b"v1".to_vec());
133        assert_eq!(s.get("k"), Some(b"v1".to_vec()));
134        s.set("k", b"v2".to_vec());
135        assert_eq!(s.get("k"), Some(b"v2".to_vec()));
136        s.delete("k");
137        assert!(s.get("k").is_none());
138    }
139
140    #[test]
141    fn increment_is_atomic_under_concurrency() {
142        use std::thread;
143        let store = InMemoryStateStore::shared();
144        let mut handles = Vec::new();
145        for _ in 0..16 {
146            let s = store.clone();
147            handles.push(thread::spawn(move || {
148                for _ in 0..1000 {
149                    s.increment_i64("hits", 1);
150                }
151            }));
152        }
153        for h in handles {
154            h.join().unwrap();
155        }
156        let final_val: i64 = std::str::from_utf8(&store.get("hits").unwrap())
157            .unwrap()
158            .parse()
159            .unwrap();
160        assert_eq!(final_val, 16_000);
161    }
162
163    #[test]
164    fn increment_seeds_from_set_value() {
165        let s = InMemoryStateStore::new();
166        s.set("k", b"5".to_vec());
167        assert_eq!(s.increment_i64("k", 3), 8);
168        assert_eq!(s.get("k"), Some(b"8".to_vec()));
169    }
170
171    #[test]
172    fn set_after_increment_clears_counter() {
173        let s = InMemoryStateStore::new();
174        s.increment_i64("k", 10);
175        s.set("k", b"fresh".to_vec());
176        assert_eq!(s.get("k"), Some(b"fresh".to_vec()));
177    }
178}