Skip to main content

algocline_engine/
state.rs

1//! Persistent key-value state backed by JSON files.
2//!
3//! ## Architecture
4//!
5//! All state operations go through the [`StateStore`] trait, which
6//! abstracts the storage backend.  The default implementation,
7//! [`JsonFileStore`], persists each namespace as a JSON file under a
8//! caller-provided root directory with atomic writes (tmp + rename).
9//!
10//! ## Tier 1 — Current API
11//!
12//! | Operation | Description |
13//! |-----------|-------------|
14//! | `get` | Read a value (returns `None` if absent) |
15//! | `set` | Write a value (upsert) |
16//! | `delete` | Remove a key (returns whether it existed) |
17//! | `keys` | List all keys in a namespace |
18//! | `has` | Check existence (cost is backend-dependent) |
19//! | `set_nx` | Set-if-not-exists (returns `false` if key already present) |
20//! | `incr` | Counter increment — single-process atomic (read-modify-write) |
21//!
22//! ## Tier 2 — Future Extensions (design notes, not yet implemented)
23//!
24//! The following operations are planned but **not yet implemented**.
25//! The trait is designed to accommodate them without breaking changes.
26//! Review this list when adding a new backend.
27//!
28//! - **TTL**: `set(key, value, opts)` with `opts.ttl_secs`, plus
29//!   `ttl(key) -> Option<Duration>` to query remaining time.  Useful
30//!   for caching patterns (e.g. Hub index cache, LLM response cache).
31//! - **Batch**: `mget(keys) -> Vec<Option<Value>>` and
32//!   `mset(pairs) -> Result<()>`.  Reduces I/O round-trips for
33//!   file/network backends.
34//! - **clear**: Flush all keys in a namespace.  OpenResty's
35//!   `flush_all` equivalent.
36//!
37//! ## Backend Swappability
38//!
39//! Because the engine interacts with state only through the
40//! [`StateStore`] trait, backends can be swapped without changing Lua
41//! code.  Planned backends:
42//!
43//! - `JsonFileStore` (current, default)
44//! - In-memory `HashMap` (for tests and short-lived sessions)
45//! - SQLite (for larger datasets with indexed queries)
46//! - Redis (for distributed / multi-process scenarios)
47
48use std::collections::HashMap;
49use std::fs;
50use std::path::{Path, PathBuf};
51use std::sync::{Arc, Mutex};
52
53use serde_json::Value;
54
55// ═══════════════════════════════════════════════════════════════
56// Trait
57// ═══════════════════════════════════════════════════════════════
58
59/// Backend-agnostic key-value state store.
60///
61/// All operations are namespace-scoped.  Implementations must be
62/// `Send + Sync` so they can be shared across Lua VMs (e.g. fork).
63pub trait StateStore: Send + Sync {
64    /// Read a value.  Returns `None` if the key does not exist.
65    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
66
67    /// Write a value (upsert).
68    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
69
70    /// Remove a key.  Returns `true` if it existed.
71    fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
72
73    /// List all keys in a namespace.
74    fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
75
76    /// Check whether a key exists.
77    ///
78    /// Whether this is cheaper than `get` + nil check depends on the
79    /// backend.  `JsonFileStore` still loads the whole namespace; backends
80    /// like Redis or SQLite can answer with an `EXISTS` command.
81    fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
82
83    /// Set a value only if the key does **not** already exist.
84    /// Returns `true` if the value was written, `false` if the key
85    /// was already present.
86    ///
87    /// **Note:** `JsonFileStore` serialises this operation per namespace
88    /// with an in-process `Mutex`, making it safe across concurrent tokio
89    /// tasks within the same process.  Cross-process atomicity still
90    /// requires a backend with native CAS (Redis `SETNX`, SQLite
91    /// transactions).
92    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
93
94    /// Counter increment, serialised per namespace within the same process.
95    ///
96    /// Adds `delta` to the current numeric value at `key`.  If the key
97    /// is missing, initialises it to `default` before adding.  Returns
98    /// the new value.
99    ///
100    /// `JsonFileStore` acquires a per-namespace `Mutex` for the full
101    /// read-modify-write cycle, preventing lost updates across concurrent
102    /// tokio tasks.  For multi-process safety use a backend with native
103    /// `INCR` (Redis) or transactions (SQLite).
104    ///
105    /// Uses `f64` internally.  Integer-valued deltas are exact; fractional
106    /// deltas may accumulate floating-point rounding errors over many calls.
107    ///
108    /// Errors if the existing value is not a JSON number.
109    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
110}
111
112// ═══════════════════════════════════════════════════════════════
113// JsonFileStore — default backend
114// ═══════════════════════════════════════════════════════════════
115
116/// JSON-file-backed state store.
117///
118/// Each namespace is a single JSON file at
119/// `{root}/{namespace}.json`.  Writes are atomic: the new state is
120/// written to a `.tmp` sibling and then renamed.
121///
122/// The root directory is provided at construction time; callers are
123/// expected to resolve it from the service-layer `AppDir` abstraction
124/// (typically `~/.algocline/state/`).
125///
126/// ## Concurrency
127///
128/// Per-namespace locks (`std::sync::Mutex`) prevent lost updates under
129/// concurrent `alc.state.*` calls within the same process.  The lock
130/// is acquired for the full load → mutate → atomic-rename cycle, so
131/// two tokio tasks operating on the **same** namespace are serialised.
132///
133/// Rationale for `std::sync::Mutex` over `tokio::sync::Mutex`:
134/// all I/O inside the lock uses `std::fs` (synchronous, no `.await`),
135/// so a standard mutex is sufficient and avoids holding a tokio mutex
136/// across potential scheduler context switches.
137///
138/// **Multi-process safety is NOT provided.**  If multiple `alc`
139/// processes share the same state directory (uncommon), use a backend
140/// with native `INCR` (Redis) or transactions (SQLite).
141pub struct JsonFileStore {
142    root: PathBuf,
143    /// Per-namespace locks.  Keyed by the resolved JSON file path so
144    /// that namespace validation is already applied before lookup.
145    locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
146}
147
148impl JsonFileStore {
149    /// Construct a store rooted at an explicit path.
150    ///
151    /// The directory is **not** created eagerly; it is created lazily
152    /// on the first `set` / `set_nx` / `incr` call via [`Self::state_path`].
153    pub fn new(root: PathBuf) -> Self {
154        Self {
155            root,
156            locks: Mutex::new(HashMap::new()),
157        }
158    }
159
160    /// Acquire (or create) the per-namespace lock and return a guard.
161    ///
162    /// The returned `std::sync::MutexGuard` keeps the namespace lock
163    /// held for the duration of the caller's load → mutate → save cycle.
164    /// The outer `locks` map is released immediately after the inner
165    /// `Arc` is cloned, so contention on unrelated namespaces is zero.
166    fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
167        let mut map = self
168            .locks
169            .lock()
170            .map_err(|_| "state: locks map poisoned".to_string())?;
171        Ok(Arc::clone(
172            map.entry(path.to_path_buf())
173                .or_insert_with(|| Arc::new(Mutex::new(()))),
174        ))
175    }
176
177    /// Return the root directory this store writes under.
178    pub fn root(&self) -> &Path {
179        &self.root
180    }
181
182    /// Ensure the root directory exists, returning it.
183    fn ensure_root(&self) -> Result<&Path, String> {
184        if !self.root.exists() {
185            fs::create_dir_all(&self.root)
186                .map_err(|e| format!("Failed to create state dir: {e}"))?;
187        }
188        Ok(&self.root)
189    }
190
191    /// Resolve the JSON file path for a namespace, validating the name
192    /// and creating the root directory on demand.
193    pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
194        if ns.contains('/')
195            || ns.contains('\\')
196            || ns.contains("..")
197            || ns.contains('\0')
198            || ns.is_empty()
199        {
200            return Err(format!("Invalid namespace: '{ns}'"));
201        }
202        let dir = self.ensure_root()?;
203        Ok(dir.join(format!("{ns}.json")))
204    }
205
206    fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
207        let path = self.state_path(ns)?;
208        if !path.exists() {
209            return Ok(HashMap::new());
210        }
211        let content =
212            fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
213        serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
214    }
215
216    fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
217        let path = self.state_path(ns)?;
218        let tmp = path.with_extension("json.tmp");
219        let content = serde_json::to_string_pretty(data)
220            .map_err(|e| format!("Failed to serialize state: {e}"))?;
221        fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
222        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
223        Ok(())
224    }
225}
226
227impl StateStore for JsonFileStore {
228    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
229        let path = self.state_path(ns)?;
230        let lock = self.ns_lock(&path)?;
231        let _guard = lock
232            .lock()
233            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
234        let state = self.load(ns)?;
235        Ok(state.get(key).cloned())
236    }
237
238    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
239        let path = self.state_path(ns)?;
240        let lock = self.ns_lock(&path)?;
241        let _guard = lock
242            .lock()
243            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
244        let mut state = self.load(ns)?;
245        state.insert(key.to_string(), value);
246        self.save(ns, &state)
247    }
248
249    fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
250        let path = self.state_path(ns)?;
251        let lock = self.ns_lock(&path)?;
252        let _guard = lock
253            .lock()
254            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
255        let mut state = self.load(ns)?;
256        let existed = state.remove(key).is_some();
257        if existed {
258            self.save(ns, &state)?;
259        }
260        Ok(existed)
261    }
262
263    fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
264        let path = self.state_path(ns)?;
265        let lock = self.ns_lock(&path)?;
266        let _guard = lock
267            .lock()
268            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
269        let state = self.load(ns)?;
270        Ok(state.keys().cloned().collect())
271    }
272
273    fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
274        let path = self.state_path(ns)?;
275        let lock = self.ns_lock(&path)?;
276        let _guard = lock
277            .lock()
278            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
279        let state = self.load(ns)?;
280        Ok(state.contains_key(key))
281    }
282
283    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
284        let path = self.state_path(ns)?;
285        let lock = self.ns_lock(&path)?;
286        let _guard = lock
287            .lock()
288            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
289        let mut state = self.load(ns)?;
290        if state.contains_key(key) {
291            return Ok(false);
292        }
293        state.insert(key.to_string(), value);
294        self.save(ns, &state)?;
295        Ok(true)
296    }
297
298    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
299        let path = self.state_path(ns)?;
300        let lock = self.ns_lock(&path)?;
301        let _guard = lock
302            .lock()
303            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
304        let mut state = self.load(ns)?;
305        let current = match state.get(key) {
306            Some(v) => v
307                .as_f64()
308                .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
309            None => default,
310        };
311        let new_val = current + delta;
312        state.insert(key.to_string(), serde_json::json!(new_val));
313        self.save(ns, &state)?;
314        Ok(new_val)
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use tempfile::TempDir;
322
323    /// Create a JsonFileStore rooted in a fresh tempdir, returning both
324    /// so the TempDir guard lives for the test duration.
325    fn new_store() -> (JsonFileStore, TempDir) {
326        let tmp = tempfile::tempdir().unwrap();
327        let store = JsonFileStore::new(tmp.path().to_path_buf());
328        (store, tmp)
329    }
330
331    #[test]
332    fn roundtrip() {
333        let (store, _tmp) = new_store();
334        let ns = "rt";
335
336        store.set(ns, "count", serde_json::json!(42)).unwrap();
337        store
338            .set(ns, "name", serde_json::json!("algocline"))
339            .unwrap();
340
341        assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
342        assert_eq!(
343            store.get(ns, "name").unwrap(),
344            Some(serde_json::json!("algocline"))
345        );
346        assert_eq!(store.get(ns, "missing").unwrap(), None);
347
348        let k = store.keys(ns).unwrap();
349        assert!(k.contains(&"count".to_string()));
350        assert!(k.contains(&"name".to_string()));
351
352        assert!(store.delete(ns, "count").unwrap());
353        assert!(!store.delete(ns, "count").unwrap());
354        assert_eq!(store.get(ns, "count").unwrap(), None);
355    }
356
357    #[test]
358    fn invalid_namespace() {
359        let (store, _tmp) = new_store();
360        assert!(store.state_path("../evil").is_err());
361        assert!(store.state_path("foo/bar").is_err());
362        assert!(store.state_path("foo\\bar").is_err());
363        assert!(store.state_path("").is_err());
364        assert!(store.state_path("foo\0bar").is_err());
365    }
366
367    #[test]
368    fn get_nonexistent_namespace_returns_empty() {
369        let (store, _tmp) = new_store();
370        let result = store.get("ghost_ns", "any_key").unwrap();
371        assert_eq!(result, None);
372    }
373
374    #[test]
375    fn keys_nonexistent_namespace_returns_empty() {
376        let (store, _tmp) = new_store();
377        let result = store.keys("ghost_ns").unwrap();
378        assert!(result.is_empty());
379    }
380
381    #[test]
382    fn delete_nonexistent_key_returns_false() {
383        let (store, _tmp) = new_store();
384        assert!(!store.delete("delns", "nope").unwrap());
385    }
386
387    #[test]
388    fn set_overwrites_existing_value() {
389        let (store, _tmp) = new_store();
390        let ns = "ow";
391
392        store.set(ns, "k", serde_json::json!(1)).unwrap();
393        store.set(ns, "k", serde_json::json!(2)).unwrap();
394        assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
395    }
396
397    #[test]
398    fn state_path_valid_namespaces() {
399        let (store, _tmp) = new_store();
400        assert!(store.state_path("default").is_ok());
401        assert!(store.state_path("my-app").is_ok());
402        assert!(store.state_path("test_123").is_ok());
403    }
404
405    // ─── Tier 1: has / set_nx / incr ──────────────────────────
406
407    #[test]
408    fn has_returns_existence() {
409        let (store, _tmp) = new_store();
410        let ns = "hasns";
411
412        assert!(!store.has(ns, "x").unwrap());
413        store.set(ns, "x", serde_json::json!(1)).unwrap();
414        assert!(store.has(ns, "x").unwrap());
415    }
416
417    #[test]
418    fn set_nx_only_sets_if_absent() {
419        let (store, _tmp) = new_store();
420        let ns = "snx";
421
422        assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
423        assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
424        assert_eq!(
425            store.get(ns, "k").unwrap(),
426            Some(serde_json::json!("first")),
427            "set_nx should not overwrite"
428        );
429    }
430
431    #[test]
432    fn incr_initialises_and_increments() {
433        let (store, _tmp) = new_store();
434        let ns = "inc";
435
436        // Missing key: initialise from default (0) + delta (1) = 1
437        let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
438        assert!((v - 1.0).abs() < f64::EPSILON);
439
440        // Increment existing
441        let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
442        assert!((v - 6.0).abs() < f64::EPSILON);
443
444        // Negative delta
445        let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
446        assert!((v - 4.0).abs() < f64::EPSILON);
447    }
448
449    #[test]
450    fn incr_rejects_non_numeric() {
451        let (store, _tmp) = new_store();
452        let ns = "incerr";
453
454        store.set(ns, "s", serde_json::json!("hello")).unwrap();
455        let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
456        assert!(err.contains("not a number"), "got: {err}");
457    }
458
459    #[test]
460    fn incr_custom_default() {
461        let (store, _tmp) = new_store();
462        let ns = "incdef";
463
464        let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
465        assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
466    }
467}
468
469#[cfg(test)]
470mod proptests {
471    use super::*;
472    use proptest::prelude::*;
473
474    fn new_store() -> (JsonFileStore, tempfile::TempDir) {
475        let tmp = tempfile::tempdir().unwrap();
476        let store = JsonFileStore::new(tmp.path().to_path_buf());
477        (store, tmp)
478    }
479
480    proptest! {
481        /// Any valid namespace (alphanumeric + hyphen/underscore) round-trips through set/get.
482        #[test]
483        fn roundtrip_arbitrary_values(
484            key in "[a-z]{1,20}",
485            val in any::<i64>(),
486        ) {
487            let (store, _tmp) = new_store();
488            let ns = "rt";
489            let json_val = serde_json::json!(val);
490            store.set(ns, &key, json_val.clone()).unwrap();
491            let got = store.get(ns, &key).unwrap();
492            prop_assert_eq!(got, Some(json_val));
493            let _ = store.delete(ns, &key);
494        }
495
496        /// Path traversal patterns are always rejected.
497        #[test]
498        fn traversal_always_rejected(
499            prefix in "[a-z]{0,5}",
500            suffix in "[a-z]{0,5}",
501        ) {
502            let (store, _tmp) = new_store();
503            let evil = format!("{prefix}/../{suffix}");
504            prop_assert!(store.state_path(&evil).is_err());
505        }
506
507        /// state_path rejects NUL bytes anywhere in the namespace.
508        #[test]
509        fn nul_byte_always_rejected(
510            prefix in "[a-z]{0,10}",
511            suffix in "[a-z]{0,10}",
512        ) {
513            let (store, _tmp) = new_store();
514            let evil = format!("{prefix}\0{suffix}");
515            prop_assert!(store.state_path(&evil).is_err());
516        }
517    }
518}