Skip to main content

algocline_engine/
state.rs

1//! Persistent key-value state backed by JSON files.
2//!
3//! ## Architecture
4//!
5//! All state operations go through the [`StateStore`] trait, which
6//! abstracts the storage backend.  The default implementation,
7//! [`JsonFileStore`], persists each namespace as a JSON file under a
8//! caller-provided root directory with atomic writes (tmp + rename).
9//!
10//! ## Tier 1 — Current API
11//!
12//! | Operation | Description |
13//! |-----------|-------------|
14//! | `get` | Read a value (returns `None` if absent) |
15//! | `set` | Write a value (upsert) |
16//! | `delete` | Remove a key (returns whether it existed) |
17//! | `keys` | List all keys in a namespace |
18//! | `has` | Check existence (cost is backend-dependent) |
19//! | `set_nx` | Set-if-not-exists (returns `false` if key already present) |
20//! | `incr` | Counter increment — single-process atomic (read-modify-write) |
21//!
22//! ## Tier 2 — Future Extensions (design notes, not yet implemented)
23//!
24//! The following operations are planned but **not yet implemented**.
25//! The trait is designed to accommodate them without breaking changes.
26//! Review this list when adding a new backend.
27//!
28//! - **TTL**: `set(key, value, opts)` with `opts.ttl_secs`, plus
29//!   `ttl(key) -> Option<Duration>` to query remaining time.  Useful
30//!   for caching patterns (e.g. Hub index cache, LLM response cache).
31//! - **Batch**: `mget(keys) -> Vec<Option<Value>>` and
32//!   `mset(pairs) -> Result<()>`.  Reduces I/O round-trips for
33//!   file/network backends.
34//! - **clear**: Flush all keys in a namespace.  OpenResty's
35//!   `flush_all` equivalent.
36//!
37//! ## Backend Swappability
38//!
39//! Because the engine interacts with state only through the
40//! [`StateStore`] trait, backends can be swapped without changing Lua
41//! code.  Planned backends:
42//!
43//! - `JsonFileStore` (current, default)
44//! - In-memory `HashMap` (for tests and short-lived sessions)
45//! - SQLite (for larger datasets with indexed queries)
46//! - Redis (for distributed / multi-process scenarios)
47
48use std::collections::HashMap;
49use std::fs;
50use std::path::{Path, PathBuf};
51use std::sync::{Arc, Mutex};
52
53use serde_json::Value;
54
55/// Whether a string is a safe path segment for `dispatch_path`.
56///
57/// Accepts ASCII alphanumerics, `_`, `-`, and `.` (single dots only —
58/// path traversal `..` and reserved names `.` are rejected). Empty
59/// strings and any other character (slash, backslash, NUL, control
60/// chars, whitespace) cause dispatch to fall back to legacy single-file
61/// storage.
62fn is_safe_segment(s: &str) -> bool {
63    if s.is_empty() || s == "." || s == ".." {
64        return false;
65    }
66    if s.contains("..") {
67        return false;
68    }
69    s.bytes()
70        .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-' || b == b'.')
71}
72
73// ═══════════════════════════════════════════════════════════════
74// Trait
75// ═══════════════════════════════════════════════════════════════
76
77/// Backend-agnostic key-value state store.
78///
79/// All operations are namespace-scoped.  Implementations must be
80/// `Send + Sync` so they can be shared across Lua VMs (e.g. fork).
81pub trait StateStore: Send + Sync {
82    /// Read a value.  Returns `None` if the key does not exist.
83    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
84
85    /// Write a value (upsert).
86    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
87
88    /// Remove a key.  Returns `true` if it existed.
89    fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
90
91    /// List all keys in a namespace.
92    fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
93
94    /// Check whether a key exists.
95    ///
96    /// Whether this is cheaper than `get` + nil check depends on the
97    /// backend.  `JsonFileStore` still loads the whole namespace; backends
98    /// like Redis or SQLite can answer with an `EXISTS` command.
99    fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
100
101    /// Set a value only if the key does **not** already exist.
102    /// Returns `true` if the value was written, `false` if the key
103    /// was already present.
104    ///
105    /// **Note:** `JsonFileStore` serialises this operation per namespace
106    /// with an in-process `Mutex`, making it safe across concurrent tokio
107    /// tasks within the same process.  Cross-process atomicity still
108    /// requires a backend with native CAS (Redis `SETNX`, SQLite
109    /// transactions).
110    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
111
112    /// Counter increment, serialised per namespace within the same process.
113    ///
114    /// Adds `delta` to the current numeric value at `key`.  If the key
115    /// is missing, initialises it to `default` before adding.  Returns
116    /// the new value.
117    ///
118    /// `JsonFileStore` acquires a per-namespace `Mutex` for the full
119    /// read-modify-write cycle, preventing lost updates across concurrent
120    /// tokio tasks.  For multi-process safety use a backend with native
121    /// `INCR` (Redis) or transactions (SQLite).
122    ///
123    /// Uses `f64` internally.  Integer-valued deltas are exact; fractional
124    /// deltas may accumulate floating-point rounding errors over many calls.
125    ///
126    /// Errors if the existing value is not a JSON number.
127    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
128}
129
130// ═══════════════════════════════════════════════════════════════
131// JsonFileStore — default backend
132// ═══════════════════════════════════════════════════════════════
133
134/// JSON-file-backed state store.
135///
136/// Each namespace is a single JSON file at
137/// `{root}/{namespace}.json`.  Writes are atomic: the new state is
138/// written to a `.tmp` sibling and then renamed.
139///
140/// The root directory is provided at construction time; callers are
141/// expected to resolve it from the service-layer `AppDir` abstraction
142/// (typically `~/.algocline/state/`).
143///
144/// ## Concurrency
145///
146/// Per-namespace locks (`std::sync::Mutex`) prevent lost updates under
147/// concurrent `alc.state.*` calls within the same process.  The lock
148/// is acquired for the full load → mutate → atomic-rename cycle, so
149/// two tokio tasks operating on the **same** namespace are serialised.
150///
151/// Rationale for `std::sync::Mutex` over `tokio::sync::Mutex`:
152/// all I/O inside the lock uses `std::fs` (synchronous, no `.await`),
153/// so a standard mutex is sufficient and avoids holding a tokio mutex
154/// across potential scheduler context switches.
155///
156/// **Multi-process safety is NOT provided.**  If multiple `alc`
157/// processes share the same state directory (uncommon), use a backend
158/// with native `INCR` (Redis) or transactions (SQLite).
159pub struct JsonFileStore {
160    root: PathBuf,
161    /// Per-namespace locks.  Keyed by the resolved JSON file path so
162    /// that namespace validation is already applied before lookup.
163    locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
164}
165
166impl JsonFileStore {
167    /// Construct a store rooted at an explicit path.
168    ///
169    /// The directory is **not** created eagerly; it is created lazily
170    /// on the first `set` / `set_nx` / `incr` call via [`Self::state_path`].
171    pub fn new(root: PathBuf) -> Self {
172        Self {
173            root,
174            locks: Mutex::new(HashMap::new()),
175        }
176    }
177
178    /// Acquire (or create) the per-namespace lock and return a guard.
179    ///
180    /// The returned `std::sync::MutexGuard` keeps the namespace lock
181    /// held for the duration of the caller's load → mutate → save cycle.
182    /// The outer `locks` map is released immediately after the inner
183    /// `Arc` is cloned, so contention on unrelated namespaces is zero.
184    fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
185        let mut map = self
186            .locks
187            .lock()
188            .map_err(|_| "state: locks map poisoned".to_string())?;
189        Ok(Arc::clone(
190            map.entry(path.to_path_buf())
191                .or_insert_with(|| Arc::new(Mutex::new(()))),
192        ))
193    }
194
195    /// Return the root directory this store writes under.
196    pub fn root(&self) -> &Path {
197        &self.root
198    }
199
200    /// Ensure the root directory exists, returning it.
201    fn ensure_root(&self) -> Result<&Path, String> {
202        if !self.root.exists() {
203            fs::create_dir_all(&self.root)
204                .map_err(|e| format!("Failed to create state dir: {e}"))?;
205        }
206        Ok(&self.root)
207    }
208
209    /// Resolve the JSON file path for a namespace, validating the name
210    /// and creating the root directory on demand.
211    pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
212        if ns.contains('/')
213            || ns.contains('\\')
214            || ns.contains("..")
215            || ns.contains('\0')
216            || ns.is_empty()
217        {
218            return Err(format!("Invalid namespace: '{ns}'"));
219        }
220        let dir = self.ensure_root()?;
221        Ok(dir.join(format!("{ns}.json")))
222    }
223
224    /// Resolve the per-key dispatched file path for a `{prefix}:{id}`
225    /// shaped key, returning `None` when the key does not match the
226    /// dispatch contract (no `:`, multiple `:`, or unsafe characters
227    /// in either segment).
228    ///
229    /// Dispatched layout (issue #1776868812):
230    ///   `{root}/{prefix}/{id}.json` — file contents = the value as
231    ///   raw JSON (no wrapper map). Each `flow.state_save(state)` call
232    ///   becomes a single per-task file rather than another entry
233    ///   crammed into `default.json`.
234    ///
235    /// Legacy layout: keys without `:` (or with unsafe characters)
236    /// continue writing into `{ns}.json` so existing behaviour is
237    /// preserved without migration.
238    fn dispatch_path(&self, key: &str) -> Result<Option<PathBuf>, String> {
239        let (prefix, id) = match key.split_once(':') {
240            Some(pair) => pair,
241            None => return Ok(None),
242        };
243        if !is_safe_segment(prefix) || !is_safe_segment(id) {
244            return Ok(None);
245        }
246        let dir = self.ensure_root()?;
247        Ok(Some(dir.join(prefix).join(format!("{id}.json"))))
248    }
249
250    /// Read a dispatched value file and deserialize it as JSON.
251    /// Returns `Ok(None)` when the file does not exist.
252    fn load_dispatched(&self, path: &Path) -> Result<Option<Value>, String> {
253        if !path.exists() {
254            return Ok(None);
255        }
256        let content = fs::read_to_string(path)
257            .map_err(|e| format!("Failed to read dispatched state '{}': {e}", path.display()))?;
258        let v: Value = serde_json::from_str(&content)
259            .map_err(|e| format!("Failed to parse dispatched state '{}': {e}", path.display()))?;
260        Ok(Some(v))
261    }
262
263    /// Atomically write a value to a dispatched file (tmp + rename).
264    /// Creates the prefix subdirectory if missing.
265    fn save_dispatched(&self, path: &Path, value: &Value) -> Result<(), String> {
266        if let Some(parent) = path.parent() {
267            if !parent.exists() {
268                fs::create_dir_all(parent).map_err(|e| {
269                    format!(
270                        "Failed to create dispatched state dir '{}': {e}",
271                        parent.display()
272                    )
273                })?;
274            }
275        }
276        let tmp = path.with_extension("json.tmp");
277        let content = serde_json::to_string_pretty(value)
278            .map_err(|e| format!("Failed to serialize dispatched state: {e}"))?;
279        fs::write(&tmp, &content)
280            .map_err(|e| format!("Failed to write dispatched state tmp: {e}"))?;
281        fs::rename(&tmp, path)
282            .map_err(|e| format!("Failed to rename dispatched state file: {e}"))?;
283        Ok(())
284    }
285
286    fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
287        let path = self.state_path(ns)?;
288        if !path.exists() {
289            return Ok(HashMap::new());
290        }
291        let content =
292            fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
293        serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
294    }
295
296    fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
297        let path = self.state_path(ns)?;
298        let tmp = path.with_extension("json.tmp");
299        let content = serde_json::to_string_pretty(data)
300            .map_err(|e| format!("Failed to serialize state: {e}"))?;
301        fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
302        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
303        Ok(())
304    }
305}
306
307impl StateStore for JsonFileStore {
308    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
309        // Dispatched path takes precedence — when present, that file is
310        // the canonical source. Falls back to the legacy `{ns}.json`
311        // store so existing entries written before dispatch was enabled
312        // remain readable without migration.
313        if let Some(dpath) = self.dispatch_path(key)? {
314            let lock = self.ns_lock(&dpath)?;
315            let _guard = lock
316                .lock()
317                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
318            if let Some(v) = self.load_dispatched(&dpath)? {
319                return Ok(Some(v));
320            }
321            // Fall through to legacy lookup so pre-dispatch values remain
322            // visible until the next set() promotes them.
323        }
324        let path = self.state_path(ns)?;
325        let lock = self.ns_lock(&path)?;
326        let _guard = lock
327            .lock()
328            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
329        let state = self.load(ns)?;
330        Ok(state.get(key).cloned())
331    }
332
333    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
334        if let Some(dpath) = self.dispatch_path(key)? {
335            let lock = self.ns_lock(&dpath)?;
336            let _guard = lock
337                .lock()
338                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
339            return self.save_dispatched(&dpath, &value);
340        }
341        let path = self.state_path(ns)?;
342        let lock = self.ns_lock(&path)?;
343        let _guard = lock
344            .lock()
345            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
346        let mut state = self.load(ns)?;
347        state.insert(key.to_string(), value);
348        self.save(ns, &state)
349    }
350
351    fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
352        if let Some(dpath) = self.dispatch_path(key)? {
353            let lock = self.ns_lock(&dpath)?;
354            let _guard = lock
355                .lock()
356                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
357            if dpath.exists() {
358                fs::remove_file(&dpath).map_err(|e| {
359                    format!(
360                        "Failed to delete dispatched state '{}': {e}",
361                        dpath.display()
362                    )
363                })?;
364                return Ok(true);
365            }
366            // Fall through to legacy delete in case the entry only exists
367            // in the legacy single-file store.
368        }
369        let path = self.state_path(ns)?;
370        let lock = self.ns_lock(&path)?;
371        let _guard = lock
372            .lock()
373            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
374        let mut state = self.load(ns)?;
375        let existed = state.remove(key).is_some();
376        if existed {
377            self.save(ns, &state)?;
378        }
379        Ok(existed)
380    }
381
382    fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
383        let path = self.state_path(ns)?;
384        let lock = self.ns_lock(&path)?;
385        let _guard = lock
386            .lock()
387            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
388        let state = self.load(ns)?;
389        Ok(state.keys().cloned().collect())
390    }
391
392    fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
393        if let Some(dpath) = self.dispatch_path(key)? {
394            let lock = self.ns_lock(&dpath)?;
395            let _guard = lock
396                .lock()
397                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
398            if dpath.exists() {
399                return Ok(true);
400            }
401            // Fall through to legacy check.
402        }
403        let path = self.state_path(ns)?;
404        let lock = self.ns_lock(&path)?;
405        let _guard = lock
406            .lock()
407            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
408        let state = self.load(ns)?;
409        Ok(state.contains_key(key))
410    }
411
412    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
413        if let Some(dpath) = self.dispatch_path(key)? {
414            let lock = self.ns_lock(&dpath)?;
415            let _guard = lock
416                .lock()
417                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
418            if dpath.exists() {
419                return Ok(false);
420            }
421            // Also honour any legacy entry to preserve set_nx semantics
422            // across the migration boundary.
423            let path = self.state_path(ns)?;
424            if path.exists() {
425                let state = self.load(ns)?;
426                if state.contains_key(key) {
427                    return Ok(false);
428                }
429            }
430            self.save_dispatched(&dpath, &value)?;
431            return Ok(true);
432        }
433        let path = self.state_path(ns)?;
434        let lock = self.ns_lock(&path)?;
435        let _guard = lock
436            .lock()
437            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
438        let mut state = self.load(ns)?;
439        if state.contains_key(key) {
440            return Ok(false);
441        }
442        state.insert(key.to_string(), value);
443        self.save(ns, &state)?;
444        Ok(true)
445    }
446
447    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
448        if let Some(dpath) = self.dispatch_path(key)? {
449            let lock = self.ns_lock(&dpath)?;
450            let _guard = lock
451                .lock()
452                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
453            let current = if let Some(v) = self.load_dispatched(&dpath)? {
454                v.as_f64()
455                    .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?
456            } else {
457                // Fall back to any legacy value so incr stays monotonic
458                // across the dispatch transition.
459                let path = self.state_path(ns)?;
460                if path.exists() {
461                    let state = self.load(ns)?;
462                    match state.get(key) {
463                        Some(v) => v
464                            .as_f64()
465                            .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
466                        None => default,
467                    }
468                } else {
469                    default
470                }
471            };
472            let new_val = current + delta;
473            self.save_dispatched(&dpath, &serde_json::json!(new_val))?;
474            return Ok(new_val);
475        }
476        let path = self.state_path(ns)?;
477        let lock = self.ns_lock(&path)?;
478        let _guard = lock
479            .lock()
480            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
481        let mut state = self.load(ns)?;
482        let current = match state.get(key) {
483            Some(v) => v
484                .as_f64()
485                .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
486            None => default,
487        };
488        let new_val = current + delta;
489        state.insert(key.to_string(), serde_json::json!(new_val));
490        self.save(ns, &state)?;
491        Ok(new_val)
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use tempfile::TempDir;
499
500    /// Create a JsonFileStore rooted in a fresh tempdir, returning both
501    /// so the TempDir guard lives for the test duration.
502    fn new_store() -> (JsonFileStore, TempDir) {
503        let tmp = tempfile::tempdir().unwrap();
504        let store = JsonFileStore::new(tmp.path().to_path_buf());
505        (store, tmp)
506    }
507
508    #[test]
509    fn roundtrip() {
510        let (store, _tmp) = new_store();
511        let ns = "rt";
512
513        store.set(ns, "count", serde_json::json!(42)).unwrap();
514        store
515            .set(ns, "name", serde_json::json!("algocline"))
516            .unwrap();
517
518        assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
519        assert_eq!(
520            store.get(ns, "name").unwrap(),
521            Some(serde_json::json!("algocline"))
522        );
523        assert_eq!(store.get(ns, "missing").unwrap(), None);
524
525        let k = store.keys(ns).unwrap();
526        assert!(k.contains(&"count".to_string()));
527        assert!(k.contains(&"name".to_string()));
528
529        assert!(store.delete(ns, "count").unwrap());
530        assert!(!store.delete(ns, "count").unwrap());
531        assert_eq!(store.get(ns, "count").unwrap(), None);
532    }
533
534    #[test]
535    fn invalid_namespace() {
536        let (store, _tmp) = new_store();
537        assert!(store.state_path("../evil").is_err());
538        assert!(store.state_path("foo/bar").is_err());
539        assert!(store.state_path("foo\\bar").is_err());
540        assert!(store.state_path("").is_err());
541        assert!(store.state_path("foo\0bar").is_err());
542    }
543
544    #[test]
545    fn get_nonexistent_namespace_returns_empty() {
546        let (store, _tmp) = new_store();
547        let result = store.get("ghost_ns", "any_key").unwrap();
548        assert_eq!(result, None);
549    }
550
551    #[test]
552    fn keys_nonexistent_namespace_returns_empty() {
553        let (store, _tmp) = new_store();
554        let result = store.keys("ghost_ns").unwrap();
555        assert!(result.is_empty());
556    }
557
558    #[test]
559    fn delete_nonexistent_key_returns_false() {
560        let (store, _tmp) = new_store();
561        assert!(!store.delete("delns", "nope").unwrap());
562    }
563
564    #[test]
565    fn set_overwrites_existing_value() {
566        let (store, _tmp) = new_store();
567        let ns = "ow";
568
569        store.set(ns, "k", serde_json::json!(1)).unwrap();
570        store.set(ns, "k", serde_json::json!(2)).unwrap();
571        assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
572    }
573
574    #[test]
575    fn state_path_valid_namespaces() {
576        let (store, _tmp) = new_store();
577        assert!(store.state_path("default").is_ok());
578        assert!(store.state_path("my-app").is_ok());
579        assert!(store.state_path("test_123").is_ok());
580    }
581
582    // ─── Tier 1: has / set_nx / incr ──────────────────────────
583
584    #[test]
585    fn has_returns_existence() {
586        let (store, _tmp) = new_store();
587        let ns = "hasns";
588
589        assert!(!store.has(ns, "x").unwrap());
590        store.set(ns, "x", serde_json::json!(1)).unwrap();
591        assert!(store.has(ns, "x").unwrap());
592    }
593
594    #[test]
595    fn set_nx_only_sets_if_absent() {
596        let (store, _tmp) = new_store();
597        let ns = "snx";
598
599        assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
600        assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
601        assert_eq!(
602            store.get(ns, "k").unwrap(),
603            Some(serde_json::json!("first")),
604            "set_nx should not overwrite"
605        );
606    }
607
608    #[test]
609    fn incr_initialises_and_increments() {
610        let (store, _tmp) = new_store();
611        let ns = "inc";
612
613        // Missing key: initialise from default (0) + delta (1) = 1
614        let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
615        assert!((v - 1.0).abs() < f64::EPSILON);
616
617        // Increment existing
618        let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
619        assert!((v - 6.0).abs() < f64::EPSILON);
620
621        // Negative delta
622        let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
623        assert!((v - 4.0).abs() < f64::EPSILON);
624    }
625
626    #[test]
627    fn incr_rejects_non_numeric() {
628        let (store, _tmp) = new_store();
629        let ns = "incerr";
630
631        store.set(ns, "s", serde_json::json!("hello")).unwrap();
632        let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
633        assert!(err.contains("not a number"), "got: {err}");
634    }
635
636    #[test]
637    fn incr_custom_default() {
638        let (store, _tmp) = new_store();
639        let ns = "incdef";
640
641        let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
642        assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
643    }
644
645    // ─── Per-key dispatch (issue #1776868812) ─────────────────────────
646
647    /// Keys shaped `{prefix}:{id}` with safe segments are written to
648    /// `{root}/{prefix}/{id}.json` rather than crammed into the legacy
649    /// `{ns}.json` SSoT.
650    #[test]
651    fn dispatch_writes_to_per_key_file_for_prefix_id_keys() {
652        let (store, tmp) = new_store();
653        store
654            .set(
655                "default",
656                "flow_orch:abc-123",
657                serde_json::json!({"step": 1}),
658            )
659            .unwrap();
660        let dispatched = tmp.path().join("flow_orch").join("abc-123.json");
661        assert!(
662            dispatched.exists(),
663            "dispatched file must exist at {}",
664            dispatched.display()
665        );
666        // Legacy file must NOT have been touched for this key.
667        let legacy = tmp.path().join("default.json");
668        assert!(
669            !legacy.exists(),
670            "legacy default.json must not be created for dispatched keys"
671        );
672    }
673
674    /// Read path: dispatched file takes precedence; legacy `{ns}.json`
675    /// is consulted only when the dispatched file is absent (so
676    /// pre-dispatch entries remain readable without migration).
677    #[test]
678    fn dispatch_read_falls_back_to_legacy_for_unmigrated_entries() {
679        let (store, tmp) = new_store();
680        // Pre-populate the legacy default.json by writing a key without
681        // a `:` (forces the legacy path) then manually inject the
682        // dispatched-shaped key into the same file to simulate a state
683        // produced before dispatch was enabled.
684        store
685            .set("default", "boot_marker", serde_json::json!(true))
686            .unwrap();
687        let legacy_path = tmp.path().join("default.json");
688        let mut existing: HashMap<String, Value> =
689            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
690        existing.insert(
691            "flow_legacy:xyz".to_string(),
692            serde_json::json!({"old": "value"}),
693        );
694        std::fs::write(
695            &legacy_path,
696            serde_json::to_string_pretty(&existing).unwrap(),
697        )
698        .unwrap();
699
700        // Read returns the legacy value because no dispatched file exists.
701        assert_eq!(
702            store.get("default", "flow_legacy:xyz").unwrap(),
703            Some(serde_json::json!({"old": "value"})),
704            "must fall back to legacy default.json when dispatched file absent"
705        );
706
707        // Once we set a new value, it lands in the dispatched file and
708        // future reads see the new value (legacy entry is shadowed).
709        store
710            .set(
711                "default",
712                "flow_legacy:xyz",
713                serde_json::json!({"new": "promoted"}),
714            )
715            .unwrap();
716        assert!(
717            tmp.path().join("flow_legacy").join("xyz.json").exists(),
718            "set() must promote dispatched-shaped keys to per-key file"
719        );
720        assert_eq!(
721            store.get("default", "flow_legacy:xyz").unwrap(),
722            Some(serde_json::json!({"new": "promoted"})),
723            "dispatched file must shadow legacy entry on subsequent reads"
724        );
725    }
726
727    /// Keys without a `:` separator (or with unsafe characters in either
728    /// segment) bypass dispatch and use the legacy single-file store.
729    #[test]
730    fn dispatch_skips_keys_without_colon_or_with_unsafe_segments() {
731        let (store, tmp) = new_store();
732        store
733            .set("default", "no_colon", serde_json::json!(1))
734            .unwrap();
735        store
736            .set("default", "bad/prefix:id", serde_json::json!(2))
737            .unwrap();
738        store
739            .set("default", "prefix:bad/id", serde_json::json!(3))
740            .unwrap();
741        store
742            .set("default", "prefix:..", serde_json::json!(4))
743            .unwrap();
744        // All four go to legacy default.json.
745        let legacy = tmp.path().join("default.json");
746        let raw: HashMap<String, Value> =
747            serde_json::from_str(&std::fs::read_to_string(&legacy).unwrap()).unwrap();
748        assert_eq!(raw.get("no_colon"), Some(&serde_json::json!(1)));
749        assert_eq!(raw.get("bad/prefix:id"), Some(&serde_json::json!(2)));
750        assert_eq!(raw.get("prefix:bad/id"), Some(&serde_json::json!(3)));
751        assert_eq!(raw.get("prefix:.."), Some(&serde_json::json!(4)));
752        // No subdirectories were created.
753        assert!(!tmp.path().join("bad").exists());
754        assert!(!tmp.path().join("prefix").exists());
755    }
756
757    /// `delete` removes the dispatched file and returns `true`.
758    #[test]
759    fn dispatch_delete_removes_per_key_file() {
760        let (store, tmp) = new_store();
761        store.set("default", "p:q", serde_json::json!("v")).unwrap();
762        let dispatched = tmp.path().join("p").join("q.json");
763        assert!(
764            dispatched.exists(),
765            "dispatched file should exist before delete"
766        );
767        assert!(store.delete("default", "p:q").unwrap());
768        assert!(
769            !dispatched.exists(),
770            "dispatched file should be removed after delete"
771        );
772        // Re-deleting returns false.
773        assert!(!store.delete("default", "p:q").unwrap());
774    }
775
776    /// `has` reflects dispatched file existence.
777    #[test]
778    fn dispatch_has_reports_dispatched_file_existence() {
779        let (store, _tmp) = new_store();
780        assert!(!store.has("default", "p:q").unwrap());
781        store.set("default", "p:q", serde_json::json!("v")).unwrap();
782        assert!(store.has("default", "p:q").unwrap());
783    }
784
785    /// `set_nx` honours both the dispatched file and any legacy entry
786    /// to keep set-if-not-exists semantics consistent across migration.
787    #[test]
788    fn dispatch_set_nx_blocks_when_legacy_or_dispatched_entry_exists() {
789        let (store, tmp) = new_store();
790        // Inject a legacy entry under the dispatched-shaped key.
791        store
792            .set("default", "boot", serde_json::json!(true))
793            .unwrap();
794        let legacy_path = tmp.path().join("default.json");
795        let mut existing: HashMap<String, Value> =
796            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
797        existing.insert("p:q".to_string(), serde_json::json!("legacy_only"));
798        std::fs::write(
799            &legacy_path,
800            serde_json::to_string_pretty(&existing).unwrap(),
801        )
802        .unwrap();
803        // set_nx must refuse because the legacy entry exists.
804        assert!(!store
805            .set_nx("default", "p:q", serde_json::json!("new"))
806            .unwrap());
807
808        // For a fresh dispatched-shaped key with no legacy entry, set_nx
809        // creates the dispatched file and returns true; second call
810        // returns false because the dispatched file now exists.
811        assert!(store
812            .set_nx("default", "p:r", serde_json::json!("first"))
813            .unwrap());
814        assert!(tmp.path().join("p").join("r.json").exists());
815        assert!(!store
816            .set_nx("default", "p:r", serde_json::json!("second"))
817            .unwrap());
818    }
819
820    /// `incr` operates on the dispatched file when the key matches the
821    /// dispatch pattern; legacy values are migrated forward on the
822    /// first call.
823    #[test]
824    fn dispatch_incr_promotes_legacy_value_on_first_call() {
825        let (store, tmp) = new_store();
826        // Pre-populate a legacy numeric value under a dispatched-shaped key.
827        store.set("default", "seed", serde_json::json!(0)).unwrap();
828        let legacy_path = tmp.path().join("default.json");
829        let mut existing: HashMap<String, Value> =
830            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
831        existing.insert("counter:cnt".to_string(), serde_json::json!(7));
832        std::fs::write(
833            &legacy_path,
834            serde_json::to_string_pretty(&existing).unwrap(),
835        )
836        .unwrap();
837
838        // First incr: reads legacy value (7), writes new value (10) to
839        // the dispatched file.
840        let result = store.incr("default", "counter:cnt", 3.0, 0.0).unwrap();
841        assert_eq!(result, 10.0);
842        let dispatched = tmp.path().join("counter").join("cnt.json");
843        assert!(dispatched.exists(), "dispatched file must be created");
844
845        // Second incr: reads dispatched (10), writes 12.
846        let result2 = store.incr("default", "counter:cnt", 2.0, 0.0).unwrap();
847        assert_eq!(result2, 12.0);
848    }
849
850    /// `is_safe_segment` accepts alphanumerics + `_-.` and rejects
851    /// path traversal sequences and reserved names.
852    #[test]
853    fn is_safe_segment_validates_path_safety() {
854        assert!(is_safe_segment("flow_orch"));
855        assert!(is_safe_segment("abc-123"));
856        assert!(is_safe_segment("v1.2.3"));
857        assert!(!is_safe_segment(""));
858        assert!(!is_safe_segment("."));
859        assert!(!is_safe_segment(".."));
860        assert!(!is_safe_segment("a..b"));
861        assert!(!is_safe_segment("a/b"));
862        assert!(!is_safe_segment("a\\b"));
863        assert!(!is_safe_segment("a b"));
864        assert!(!is_safe_segment("a\0b"));
865    }
866}
867
868#[cfg(test)]
869mod proptests {
870    use super::*;
871    use proptest::prelude::*;
872
873    fn new_store() -> (JsonFileStore, tempfile::TempDir) {
874        let tmp = tempfile::tempdir().unwrap();
875        let store = JsonFileStore::new(tmp.path().to_path_buf());
876        (store, tmp)
877    }
878
879    proptest! {
880        /// Any valid namespace (alphanumeric + hyphen/underscore) round-trips through set/get.
881        #[test]
882        fn roundtrip_arbitrary_values(
883            key in "[a-z]{1,20}",
884            val in any::<i64>(),
885        ) {
886            let (store, _tmp) = new_store();
887            let ns = "rt";
888            let json_val = serde_json::json!(val);
889            store.set(ns, &key, json_val.clone()).unwrap();
890            let got = store.get(ns, &key).unwrap();
891            prop_assert_eq!(got, Some(json_val));
892            let _ = store.delete(ns, &key);
893        }
894
895        /// Path traversal patterns are always rejected.
896        #[test]
897        fn traversal_always_rejected(
898            prefix in "[a-z]{0,5}",
899            suffix in "[a-z]{0,5}",
900        ) {
901            let (store, _tmp) = new_store();
902            let evil = format!("{prefix}/../{suffix}");
903            prop_assert!(store.state_path(&evil).is_err());
904        }
905
906        /// state_path rejects NUL bytes anywhere in the namespace.
907        #[test]
908        fn nul_byte_always_rejected(
909            prefix in "[a-z]{0,10}",
910            suffix in "[a-z]{0,10}",
911        ) {
912            let (store, _tmp) = new_store();
913            let evil = format!("{prefix}\0{suffix}");
914            prop_assert!(store.state_path(&evil).is_err());
915        }
916    }
917}