Skip to main content

algocline_engine/
state.rs

1//! Persistent key-value state backed by JSON files.
2//!
3//! ## Architecture
4//!
5//! All state operations go through the [`StateStore`] trait, which
6//! abstracts the storage backend.  The default implementation,
7//! [`JsonFileStore`], persists each namespace as a JSON file under a
8//! caller-provided root directory with atomic writes (tmp + rename).
9//!
10//! ## Tier 1 — Current API
11//!
12//! | Operation | Description |
13//! |-----------|-------------|
14//! | `get` | Read a value (returns `None` if absent) |
15//! | `set` | Write a value (upsert) |
16//! | `delete` | Remove a key (returns whether it existed) |
17//! | `keys` | List all keys in a namespace |
18//! | `has` | Check existence (cost is backend-dependent) |
19//! | `set_nx` | Set-if-not-exists (returns `false` if key already present) |
20//! | `incr` | Counter increment — single-process atomic (read-modify-write) |
21//!
22//! ## Tier 2 — Future Extensions (design notes, not yet implemented)
23//!
24//! The following operations are planned but **not yet implemented**.
25//! The trait is designed to accommodate them without breaking changes.
26//! Review this list when adding a new backend.
27//!
28//! - **TTL**: `set(key, value, opts)` with `opts.ttl_secs`, plus
29//!   `ttl(key) -> Option<Duration>` to query remaining time.  Useful
30//!   for caching patterns (e.g. Hub index cache, LLM response cache).
31//! - **Batch**: `mget(keys) -> Vec<Option<Value>>` and
32//!   `mset(pairs) -> Result<()>`.  Reduces I/O round-trips for
33//!   file/network backends.
34//! - **clear**: Flush all keys in a namespace.  OpenResty's
35//!   `flush_all` equivalent.
36//!
37//! ## Backend Swappability
38//!
39//! Because the engine interacts with state only through the
40//! [`StateStore`] trait, backends can be swapped without changing Lua
41//! code.  Planned backends:
42//!
43//! - `JsonFileStore` (current, default)
44//! - In-memory `HashMap` (for tests and short-lived sessions)
45//! - SQLite (for larger datasets with indexed queries)
46//! - Redis (for distributed / multi-process scenarios)
47
48use std::collections::HashMap;
49use std::fs;
50use std::path::{Path, PathBuf};
51use std::sync::{Arc, Mutex};
52
53use serde_json::Value;
54
55// ═══════════════════════════════════════════════════════════════
56// Typed error for dispatched-layout operations
57// ═══════════════════════════════════════════════════════════════
58
59/// Errors returned by the dispatched-layout helpers
60/// (`list_dispatched`, `show_dispatched`, `reset_dispatched_with_backup`).
61///
62/// Unlike the legacy [`StateStore`] trait methods (which return `String`
63/// errors), these helpers use a typed enum so callers can distinguish
64/// missing-key from I/O failure at the type level without pattern-matching
65/// on OS error codes.
66#[derive(thiserror::Error, Debug)]
67pub enum StateError {
68    /// The requested key does not exist in the given namespace.
69    ///
70    /// # Arguments
71    /// * `namespace` — the namespace that was searched
72    /// * `key` — the key that was not found
73    #[error("state: key '{key}' not found in namespace '{namespace}'")]
74    KeyNotFound { namespace: String, key: String },
75
76    /// A namespace or key segment failed the path-safety check.
77    ///
78    /// # Arguments
79    /// * `which` — either `"namespace"` or `"key"`
80    /// * `value` — the offending segment value
81    #[error("state: unsafe {which} segment '{value}'")]
82    UnsafeSegment { which: &'static str, value: String },
83
84    /// A backup I/O operation (`fs::copy` to `.bak`) failed.
85    ///
86    /// Wraps the underlying [`std::io::Error`].  Kept separate from
87    /// [`StateError::IoWrite`] so callers know that the live file was
88    /// not yet touched when this error occurs.
89    #[error("state: backup I/O failed: {0}")]
90    IoBackup(#[source] std::io::Error),
91
92    /// A read or directory-scan operation failed.
93    ///
94    /// Wraps the underlying [`std::io::Error`].  Covers `fs::read_to_string`,
95    /// `fs::read_dir`, and `DirEntry` iteration.
96    #[error("state: read failed: {0}")]
97    IoRead(#[source] std::io::Error),
98
99    /// A write or rename operation on the live file or its `.tmp`
100    /// staging copy failed.
101    ///
102    /// Wraps the underlying [`std::io::Error`].  Covers `fs::write` and
103    /// `fs::rename`.
104    #[error("state: write failed: {0}")]
105    IoWrite(#[source] std::io::Error),
106
107    /// JSON serialization or deserialization failed.
108    ///
109    /// Uses `#[from]` so `?` auto-converts `serde_json::Error`.
110    #[error("state: serialize/parse failed: {0}")]
111    Serde(#[from] serde_json::Error),
112
113    /// The stored JSON does not have the expected shape.
114    ///
115    /// # Arguments
116    /// * `reason` — human-readable description of the shape violation
117    ///   (e.g. `"missing 'data' top-level field"` or
118    ///   `"data.completed_steps must be an array"`)
119    #[error("state: shape invalid: {reason}")]
120    ShapeInvalid { reason: String },
121}
122
123// ═══════════════════════════════════════════════════════════════
124// ResetReport — return value of reset_dispatched_with_backup
125// ═══════════════════════════════════════════════════════════════
126
127/// Report returned by [`JsonFileStore::reset_dispatched_with_backup`]
128/// describing what was modified.
129#[derive(Debug, Clone)]
130pub struct ResetReport {
131    /// Path to the `.bak` snapshot created before the mutation.
132    pub backup_path: PathBuf,
133    /// Number of entries removed from `data.completed_steps`.
134    pub steps_removed: usize,
135    /// Number of keys deleted from the `data` top-level object.
136    pub fields_removed: usize,
137}
138
139/// Whether a string is a safe path segment for `dispatch_path`.
140///
141/// Accepts ASCII alphanumerics, `_`, `-`, and `.` (single dots only —
142/// path traversal `..` and reserved names `.` are rejected). Empty
143/// strings and any other character (slash, backslash, NUL, control
144/// chars, whitespace) cause dispatch to fall back to legacy single-file
145/// storage.
146fn is_safe_segment(s: &str) -> bool {
147    if s.is_empty() || s == "." || s == ".." {
148        return false;
149    }
150    if s.contains("..") {
151        return false;
152    }
153    s.bytes()
154        .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-' || b == b'.')
155}
156
157// ═══════════════════════════════════════════════════════════════
158// Trait
159// ═══════════════════════════════════════════════════════════════
160
161/// Backend-agnostic key-value state store.
162///
163/// All operations are namespace-scoped.  Implementations must be
164/// `Send + Sync` so they can be shared across Lua VMs (e.g. fork).
165pub trait StateStore: Send + Sync {
166    /// Read a value.  Returns `None` if the key does not exist.
167    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
168
169    /// Write a value (upsert).
170    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
171
172    /// Remove a key.  Returns `true` if it existed.
173    fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
174
175    /// List all keys in a namespace.
176    fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
177
178    /// Check whether a key exists.
179    ///
180    /// Whether this is cheaper than `get` + nil check depends on the
181    /// backend.  `JsonFileStore` still loads the whole namespace; backends
182    /// like Redis or SQLite can answer with an `EXISTS` command.
183    fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
184
185    /// Set a value only if the key does **not** already exist.
186    /// Returns `true` if the value was written, `false` if the key
187    /// was already present.
188    ///
189    /// **Note:** `JsonFileStore` serialises this operation per namespace
190    /// with an in-process `Mutex`, making it safe across concurrent tokio
191    /// tasks within the same process.  Cross-process atomicity still
192    /// requires a backend with native CAS (Redis `SETNX`, SQLite
193    /// transactions).
194    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
195
196    /// Counter increment, serialised per namespace within the same process.
197    ///
198    /// Adds `delta` to the current numeric value at `key`.  If the key
199    /// is missing, initialises it to `default` before adding.  Returns
200    /// the new value.
201    ///
202    /// `JsonFileStore` acquires a per-namespace `Mutex` for the full
203    /// read-modify-write cycle, preventing lost updates across concurrent
204    /// tokio tasks.  For multi-process safety use a backend with native
205    /// `INCR` (Redis) or transactions (SQLite).
206    ///
207    /// Uses `f64` internally.  Integer-valued deltas are exact; fractional
208    /// deltas may accumulate floating-point rounding errors over many calls.
209    ///
210    /// Errors if the existing value is not a JSON number.
211    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
212}
213
214// ═══════════════════════════════════════════════════════════════
215// JsonFileStore — default backend
216// ═══════════════════════════════════════════════════════════════
217
218/// JSON-file-backed state store.
219///
220/// Each namespace is a single JSON file at
221/// `{root}/{namespace}.json`.  Writes are atomic: the new state is
222/// written to a `.tmp` sibling and then renamed.
223///
224/// The root directory is provided at construction time; callers are
225/// expected to resolve it from the service-layer `AppDir` abstraction
226/// (typically `~/.algocline/state/`).
227///
228/// ## Concurrency
229///
230/// Per-namespace locks (`std::sync::Mutex`) prevent lost updates under
231/// concurrent `alc.state.*` calls within the same process.  The lock
232/// is acquired for the full load → mutate → atomic-rename cycle, so
233/// two tokio tasks operating on the **same** namespace are serialised.
234///
235/// Rationale for `std::sync::Mutex` over `tokio::sync::Mutex`:
236/// all I/O inside the lock uses `std::fs` (synchronous, no `.await`),
237/// so a standard mutex is sufficient and avoids holding a tokio mutex
238/// across potential scheduler context switches.
239///
240/// **Multi-process safety is NOT provided.**  If multiple `alc`
241/// processes share the same state directory (uncommon), use a backend
242/// with native `INCR` (Redis) or transactions (SQLite).
243pub struct JsonFileStore {
244    root: PathBuf,
245    /// Per-namespace locks.  Keyed by the resolved JSON file path so
246    /// that namespace validation is already applied before lookup.
247    locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
248}
249
250impl JsonFileStore {
251    /// Construct a store rooted at an explicit path.
252    ///
253    /// The directory is **not** created eagerly; it is created lazily
254    /// on the first `set` / `set_nx` / `incr` call via [`Self::state_path`].
255    pub fn new(root: PathBuf) -> Self {
256        Self {
257            root,
258            locks: Mutex::new(HashMap::new()),
259        }
260    }
261
262    /// Acquire (or create) the per-namespace lock and return a guard.
263    ///
264    /// The returned `std::sync::MutexGuard` keeps the namespace lock
265    /// held for the duration of the caller's load → mutate → save cycle.
266    /// The outer `locks` map is released immediately after the inner
267    /// `Arc` is cloned, so contention on unrelated namespaces is zero.
268    fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
269        let mut map = self
270            .locks
271            .lock()
272            .map_err(|_| "state: locks map poisoned".to_string())?;
273        Ok(Arc::clone(
274            map.entry(path.to_path_buf())
275                .or_insert_with(|| Arc::new(Mutex::new(()))),
276        ))
277    }
278
279    /// Return the root directory this store writes under.
280    pub fn root(&self) -> &Path {
281        &self.root
282    }
283
284    /// Ensure the root directory exists, returning it.
285    fn ensure_root(&self) -> Result<&Path, String> {
286        if !self.root.exists() {
287            fs::create_dir_all(&self.root)
288                .map_err(|e| format!("Failed to create state dir: {e}"))?;
289        }
290        Ok(&self.root)
291    }
292
293    /// Resolve the JSON file path for a namespace, validating the name
294    /// and creating the root directory on demand.
295    pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
296        if ns.contains('/')
297            || ns.contains('\\')
298            || ns.contains("..")
299            || ns.contains('\0')
300            || ns.is_empty()
301        {
302            return Err(format!("Invalid namespace: '{ns}'"));
303        }
304        let dir = self.ensure_root()?;
305        Ok(dir.join(format!("{ns}.json")))
306    }
307
308    /// Resolve the per-key dispatched file path for a `{prefix}:{id}`
309    /// shaped key, returning `None` when the key does not match the
310    /// dispatch contract (no `:`, multiple `:`, or unsafe characters
311    /// in either segment).
312    ///
313    /// Dispatched layout (issue #1776868812):
314    ///   `{root}/{prefix}/{id}.json` — file contents = the value as
315    ///   raw JSON (no wrapper map). Each `flow.state_save(state)` call
316    ///   becomes a single per-task file rather than another entry
317    ///   crammed into `default.json`.
318    ///
319    /// Legacy layout: keys without `:` (or with unsafe characters)
320    /// continue writing into `{ns}.json` so existing behaviour is
321    /// preserved without migration.
322    fn dispatch_path(&self, key: &str) -> Result<Option<PathBuf>, String> {
323        let (prefix, id) = match key.split_once(':') {
324            Some(pair) => pair,
325            None => return Ok(None),
326        };
327        if !is_safe_segment(prefix) || !is_safe_segment(id) {
328            return Ok(None);
329        }
330        let dir = self.ensure_root()?;
331        Ok(Some(dir.join(prefix).join(format!("{id}.json"))))
332    }
333
334    /// Read a dispatched value file and deserialize it as JSON.
335    /// Returns `Ok(None)` when the file does not exist.
336    fn load_dispatched(&self, path: &Path) -> Result<Option<Value>, String> {
337        if !path.exists() {
338            return Ok(None);
339        }
340        let content = fs::read_to_string(path)
341            .map_err(|e| format!("Failed to read dispatched state '{}': {e}", path.display()))?;
342        let v: Value = serde_json::from_str(&content)
343            .map_err(|e| format!("Failed to parse dispatched state '{}': {e}", path.display()))?;
344        Ok(Some(v))
345    }
346
347    /// Atomically write a value to a dispatched file (tmp + rename).
348    /// Creates the prefix subdirectory if missing.
349    fn save_dispatched(&self, path: &Path, value: &Value) -> Result<(), String> {
350        if let Some(parent) = path.parent() {
351            if !parent.exists() {
352                fs::create_dir_all(parent).map_err(|e| {
353                    format!(
354                        "Failed to create dispatched state dir '{}': {e}",
355                        parent.display()
356                    )
357                })?;
358            }
359        }
360        let tmp = path.with_extension("json.tmp");
361        let content = serde_json::to_string_pretty(value)
362            .map_err(|e| format!("Failed to serialize dispatched state: {e}"))?;
363        fs::write(&tmp, &content)
364            .map_err(|e| format!("Failed to write dispatched state tmp: {e}"))?;
365        fs::rename(&tmp, path)
366            .map_err(|e| format!("Failed to rename dispatched state file: {e}"))?;
367        Ok(())
368    }
369
370    // ─── Dispatched-layout helpers ─────────────────────────────────────────
371
372    /// List all keys in the dispatched layout for a namespace.
373    ///
374    /// Enumerates `{root}/{namespace}/*.json` and returns the file names
375    /// stripped of the `.json` extension, sorted lexicographically.
376    /// Files with extensions other than `.json`, and `.bak` / `.tmp`
377    /// siblings, are excluded.  If the namespace directory does not exist
378    /// the result is an empty `Vec` (namespace-absent ≡ zero keys).
379    ///
380    /// # Arguments
381    /// * `namespace` — the directory name under `root`; must pass
382    ///   [`is_safe_segment`] validation
383    ///
384    /// # Returns
385    /// A sorted list of key strings, or a [`StateError`] on I/O / validation
386    /// failure.
387    ///
388    /// # Errors
389    /// * [`StateError::UnsafeSegment`] if `namespace` fails path-safety check
390    /// * [`StateError::IoRead`] if reading the directory fails
391    pub fn list_dispatched(&self, namespace: &str) -> Result<Vec<String>, StateError> {
392        if !is_safe_segment(namespace) {
393            return Err(StateError::UnsafeSegment {
394                which: "namespace",
395                value: namespace.to_string(),
396            });
397        }
398        let ns_dir = self.root.join(namespace);
399        if !ns_dir.exists() {
400            return Ok(Vec::new());
401        }
402        let mut keys = Vec::new();
403        let entries = fs::read_dir(&ns_dir).map_err(StateError::IoRead)?;
404        for entry in entries {
405            let entry = entry.map_err(StateError::IoRead)?;
406            let fname = entry.file_name();
407            let fname_str = fname.to_string_lossy();
408            // Only include plain `.json` files; skip `.bak`, `.tmp`, and others.
409            if !fname_str.ends_with(".json")
410                || fname_str.ends_with(".json.bak")
411                || fname_str.ends_with(".json.tmp")
412            {
413                continue;
414            }
415            // Strip the `.json` suffix to recover the key name.
416            let key = fname_str
417                .strip_suffix(".json")
418                .unwrap_or(&fname_str)
419                .to_string();
420            keys.push(key);
421        }
422        keys.sort();
423        Ok(keys)
424    }
425
426    /// Read the full JSON value for a dispatched-layout key.
427    ///
428    /// Reads `{root}/{namespace}/{key}.json` and deserializes it.
429    ///
430    /// # Arguments
431    /// * `namespace` — the subdirectory name; must pass [`is_safe_segment`]
432    /// * `key` — the file stem; must pass [`is_safe_segment`]
433    ///
434    /// # Returns
435    /// The deserialized [`serde_json::Value`] on success.
436    ///
437    /// # Errors
438    /// * [`StateError::UnsafeSegment`] if either segment fails path-safety check
439    /// * [`StateError::KeyNotFound`] if the file does not exist
440    /// * [`StateError::IoRead`] if the file cannot be read
441    /// * [`StateError::Serde`] if the content is not valid JSON
442    pub fn show_dispatched(
443        &self,
444        namespace: &str,
445        key: &str,
446    ) -> Result<serde_json::Value, StateError> {
447        if !is_safe_segment(namespace) {
448            return Err(StateError::UnsafeSegment {
449                which: "namespace",
450                value: namespace.to_string(),
451            });
452        }
453        if !is_safe_segment(key) {
454            return Err(StateError::UnsafeSegment {
455                which: "key",
456                value: key.to_string(),
457            });
458        }
459        let target = self.root.join(namespace).join(format!("{key}.json"));
460        if !target.exists() {
461            return Err(StateError::KeyNotFound {
462                namespace: namespace.to_string(),
463                key: key.to_string(),
464            });
465        }
466        let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
467        let value: serde_json::Value = serde_json::from_str(&content)?;
468        Ok(value)
469    }
470
471    /// Write or overwrite a state file in the dispatched layout.
472    ///
473    /// For a new path (file does not exist), writes directly without creating a
474    /// `.bak` file (no prior state to back up).  For an existing path (overwrite),
475    /// copies the live file to `{key}.json.bak` **before** the mutation so the
476    /// original is always recoverable on crash (Crux §3 atomicity contract).
477    ///
478    /// # Arguments
479    /// * `ns` — namespace (subdirectory name); must pass [`is_safe_segment`]
480    /// * `key` — file stem; must pass [`is_safe_segment`]
481    /// * `value` — JSON value to write
482    ///
483    /// # Errors
484    /// * [`StateError::UnsafeSegment`] — segment validation failed
485    /// * [`StateError::IoBackup`] — `.bak` copy failed (overwrite path only)
486    /// * [`StateError::IoWrite`] — tmp write or atomic rename failed
487    pub fn set_dispatched(
488        &self,
489        ns: &str,
490        key: &str,
491        value: &serde_json::Value,
492    ) -> Result<(), StateError> {
493        if !is_safe_segment(ns) {
494            return Err(StateError::UnsafeSegment {
495                which: "namespace",
496                value: ns.to_string(),
497            });
498        }
499        if !is_safe_segment(key) {
500            return Err(StateError::UnsafeSegment {
501                which: "key",
502                value: key.to_string(),
503            });
504        }
505        let target = self.root.join(ns).join(format!("{key}.json"));
506        if !target.exists() {
507            // New path: no prior state, write directly (no .bak needed).
508            return self
509                .save_dispatched(&target, value)
510                .map_err(|s| StateError::IoWrite(std::io::Error::other(s)));
511        }
512        // Overwrite path: copy .bak BEFORE mutation (Crux §3: backup before any mutation).
513        let target_bak = target.with_extension("json.bak");
514        fs::copy(&target, &target_bak).map_err(StateError::IoBackup)?;
515        self.save_dispatched(&target, value)
516            .map_err(|s| StateError::IoWrite(std::io::Error::other(s)))
517    }
518
519    /// Delete a state file in the dispatched layout.
520    ///
521    /// Returns `Ok(false)` if the file does not exist (idempotent no-op; no
522    /// `.bak` is created because there is no mutation).  Returns `Ok(true)`
523    /// after copying the live file to `{key}.json.bak` and then removing it
524    /// (Crux §3 atomicity contract: backup before any mutation).
525    ///
526    /// # Arguments
527    /// * `ns` — namespace (subdirectory name); must pass [`is_safe_segment`]
528    /// * `key` — file stem; must pass [`is_safe_segment`]
529    ///
530    /// # Errors
531    /// * [`StateError::UnsafeSegment`] — segment validation failed
532    /// * [`StateError::IoBackup`] — `.bak` copy failed
533    /// * [`StateError::IoWrite`] — `remove_file` failed
534    pub fn delete_dispatched(&self, ns: &str, key: &str) -> Result<bool, StateError> {
535        if !is_safe_segment(ns) {
536            return Err(StateError::UnsafeSegment {
537                which: "namespace",
538                value: ns.to_string(),
539            });
540        }
541        if !is_safe_segment(key) {
542            return Err(StateError::UnsafeSegment {
543                which: "key",
544                value: key.to_string(),
545            });
546        }
547        let target = self.root.join(ns).join(format!("{key}.json"));
548        if !target.exists() {
549            // Key absent: no mutation, idempotent no-op (Crux §2: Ok(false) path).
550            return Ok(false);
551        }
552        // Key present: copy .bak BEFORE removal (Crux §3: backup before any mutation).
553        let target_bak = target.with_extension("json.bak");
554        fs::copy(&target, &target_bak).map_err(StateError::IoBackup)?;
555        fs::remove_file(&target).map_err(StateError::IoWrite)?;
556        Ok(true)
557    }
558
559    /// Atomically reset a dispatched-layout state file with a backup.
560    ///
561    /// Performs the following sequence in order (Crux atomicity contract):
562    ///
563    /// 1. Validate `namespace` and `key` with [`is_safe_segment`].
564    /// 2. Compute target path: `{root}/{namespace}/{key}.json`.
565    /// 3. Return [`StateError::KeyNotFound`] if the file does not exist.
566    /// 4. Acquire the per-path mutex via [`Self::ns_lock`]; hold until rename.
567    /// 5. Copy the live file to `{root}/{namespace}/{key}.json.bak` — the
568    ///    live file is **not** touched before this point.
569    /// 6. Load and parse the live file.
570    /// 7. Apply in-memory mutations:
571    ///    - Remove each element of `steps` from `data.completed_steps` (if
572    ///      the array exists).
573    ///    - Delete each element of `fields` from the `data` top-level object.
574    ///    - If the top-level `data` field is absent or not an object, return
575    ///      [`StateError::ShapeInvalid`].
576    /// 8. Write the mutated value to `{target}.tmp`.
577    /// 9. Rename `.tmp` → target (POSIX atomic on same filesystem).
578    ///
579    /// A crash between steps 5 and 9 leaves the `.bak` intact and the live
580    /// file unmodified (or only partially written to `.tmp`), so the original
581    /// state is always recoverable.
582    ///
583    /// # Arguments
584    /// * `namespace` — subdirectory name; must pass [`is_safe_segment`]
585    /// * `key` — file stem; must pass [`is_safe_segment`]
586    /// * `steps` — step names to remove from `data.completed_steps`
587    /// * `fields` — field names to delete from the `data` top-level object
588    ///
589    /// # Returns
590    /// A [`ResetReport`] with the backup path and counts of removed items.
591    ///
592    /// # Errors
593    /// * [`StateError::UnsafeSegment`] if either segment fails path-safety check
594    /// * [`StateError::KeyNotFound`] if the file does not exist
595    /// * [`StateError::ShapeInvalid`] if the lock is poisoned or the JSON
596    ///   structure is not a `{data: {...}}` object
597    /// * [`StateError::IoBackup`] if the `.bak` copy fails
598    /// * [`StateError::IoRead`] if loading the live file fails
599    /// * [`StateError::IoWrite`] if the `.tmp` write or rename fails
600    /// * [`StateError::Serde`] if the file content is not valid JSON
601    pub fn reset_dispatched_with_backup(
602        &self,
603        namespace: &str,
604        key: &str,
605        steps: &[String],
606        fields: &[String],
607    ) -> Result<ResetReport, StateError> {
608        // (a) Validate path segments.
609        if !is_safe_segment(namespace) {
610            return Err(StateError::UnsafeSegment {
611                which: "namespace",
612                value: namespace.to_string(),
613            });
614        }
615        if !is_safe_segment(key) {
616            return Err(StateError::UnsafeSegment {
617                which: "key",
618                value: key.to_string(),
619            });
620        }
621
622        // (b) Compute target path.
623        let target = self.root.join(namespace).join(format!("{key}.json"));
624
625        // (c) Return KeyNotFound if the file does not exist.
626        if !target.exists() {
627            return Err(StateError::KeyNotFound {
628                namespace: namespace.to_string(),
629                key: key.to_string(),
630            });
631        }
632
633        // (c.5) Acquire the per-path mutex and hold it until after rename.
634        let lock = self
635            .ns_lock(&target)
636            .map_err(|s| StateError::ShapeInvalid { reason: s })?;
637        let _guard = lock.lock().map_err(|_| StateError::ShapeInvalid {
638            reason: "lock poisoned".to_string(),
639        })?;
640
641        // (d) Create .bak backup — live file is not touched before this.
642        let bak_path = target.with_extension("json.bak");
643        fs::copy(&target, &bak_path).map_err(StateError::IoBackup)?;
644
645        // (e) Load and parse the live file.
646        let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
647        let mut value: serde_json::Value = serde_json::from_str(&content)?;
648
649        // (f) Apply in-memory mutations.
650        let data = value
651            .get_mut("data")
652            .ok_or_else(|| StateError::ShapeInvalid {
653                reason: "missing 'data' top-level field".to_string(),
654            })?;
655        let data_obj = data
656            .as_object_mut()
657            .ok_or_else(|| StateError::ShapeInvalid {
658                reason: "'data' top-level field must be an object".to_string(),
659            })?;
660
661        // Remove matching entries from data.completed_steps.
662        let mut steps_removed = 0usize;
663        if !steps.is_empty() {
664            if let Some(cs) = data_obj.get_mut("completed_steps") {
665                if let Some(arr) = cs.as_array_mut() {
666                    let before = arr.len();
667                    arr.retain(|v| {
668                        if let Some(s) = v.as_str() {
669                            !steps.iter().any(|step| step == s)
670                        } else {
671                            true
672                        }
673                    });
674                    steps_removed = before - arr.len();
675                } else {
676                    return Err(StateError::ShapeInvalid {
677                        reason: "data.completed_steps must be an array".to_string(),
678                    });
679                }
680            }
681            // If completed_steps key is absent, nothing to remove — not an error.
682        }
683
684        // Delete requested fields from the data object.
685        let mut fields_removed = 0usize;
686        for field in fields {
687            if data_obj.remove(field.as_str()).is_some() {
688                fields_removed += 1;
689            }
690        }
691
692        // (g) Write mutated value to .tmp staging file.
693        let tmp = target.with_extension("json.tmp");
694        let serialized = serde_json::to_string_pretty(&value)?;
695        fs::write(&tmp, &serialized).map_err(StateError::IoWrite)?;
696
697        // (h) Atomic rename: .tmp → live file.
698        fs::rename(&tmp, &target).map_err(StateError::IoWrite)?;
699
700        Ok(ResetReport {
701            backup_path: bak_path,
702            steps_removed,
703            fields_removed,
704        })
705    }
706
707    fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
708        let path = self.state_path(ns)?;
709        if !path.exists() {
710            return Ok(HashMap::new());
711        }
712        let content =
713            fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
714        serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
715    }
716
717    fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
718        let path = self.state_path(ns)?;
719        let tmp = path.with_extension("json.tmp");
720        let content = serde_json::to_string_pretty(data)
721            .map_err(|e| format!("Failed to serialize state: {e}"))?;
722        fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
723        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
724        Ok(())
725    }
726}
727
728impl StateStore for JsonFileStore {
729    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
730        // Dispatched path takes precedence — when present, that file is
731        // the canonical source. Falls back to the legacy `{ns}.json`
732        // store so existing entries written before dispatch was enabled
733        // remain readable without migration.
734        if let Some(dpath) = self.dispatch_path(key)? {
735            let lock = self.ns_lock(&dpath)?;
736            let _guard = lock
737                .lock()
738                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
739            if let Some(v) = self.load_dispatched(&dpath)? {
740                return Ok(Some(v));
741            }
742            // Fall through to legacy lookup so pre-dispatch values remain
743            // visible until the next set() promotes them.
744        }
745        let path = self.state_path(ns)?;
746        let lock = self.ns_lock(&path)?;
747        let _guard = lock
748            .lock()
749            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
750        let state = self.load(ns)?;
751        Ok(state.get(key).cloned())
752    }
753
754    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
755        if let Some(dpath) = self.dispatch_path(key)? {
756            let lock = self.ns_lock(&dpath)?;
757            let _guard = lock
758                .lock()
759                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
760            return self.save_dispatched(&dpath, &value);
761        }
762        let path = self.state_path(ns)?;
763        let lock = self.ns_lock(&path)?;
764        let _guard = lock
765            .lock()
766            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
767        let mut state = self.load(ns)?;
768        state.insert(key.to_string(), value);
769        self.save(ns, &state)
770    }
771
772    fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
773        if let Some(dpath) = self.dispatch_path(key)? {
774            let lock = self.ns_lock(&dpath)?;
775            let _guard = lock
776                .lock()
777                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
778            if dpath.exists() {
779                fs::remove_file(&dpath).map_err(|e| {
780                    format!(
781                        "Failed to delete dispatched state '{}': {e}",
782                        dpath.display()
783                    )
784                })?;
785                return Ok(true);
786            }
787            // Fall through to legacy delete in case the entry only exists
788            // in the legacy single-file store.
789        }
790        let path = self.state_path(ns)?;
791        let lock = self.ns_lock(&path)?;
792        let _guard = lock
793            .lock()
794            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
795        let mut state = self.load(ns)?;
796        let existed = state.remove(key).is_some();
797        if existed {
798            self.save(ns, &state)?;
799        }
800        Ok(existed)
801    }
802
803    fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
804        let path = self.state_path(ns)?;
805        let lock = self.ns_lock(&path)?;
806        let _guard = lock
807            .lock()
808            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
809        let state = self.load(ns)?;
810        Ok(state.keys().cloned().collect())
811    }
812
813    fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
814        if let Some(dpath) = self.dispatch_path(key)? {
815            let lock = self.ns_lock(&dpath)?;
816            let _guard = lock
817                .lock()
818                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
819            if dpath.exists() {
820                return Ok(true);
821            }
822            // Fall through to legacy check.
823        }
824        let path = self.state_path(ns)?;
825        let lock = self.ns_lock(&path)?;
826        let _guard = lock
827            .lock()
828            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
829        let state = self.load(ns)?;
830        Ok(state.contains_key(key))
831    }
832
833    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
834        if let Some(dpath) = self.dispatch_path(key)? {
835            let lock = self.ns_lock(&dpath)?;
836            let _guard = lock
837                .lock()
838                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
839            if dpath.exists() {
840                return Ok(false);
841            }
842            // Also honour any legacy entry to preserve set_nx semantics
843            // across the migration boundary.
844            let path = self.state_path(ns)?;
845            if path.exists() {
846                let state = self.load(ns)?;
847                if state.contains_key(key) {
848                    return Ok(false);
849                }
850            }
851            self.save_dispatched(&dpath, &value)?;
852            return Ok(true);
853        }
854        let path = self.state_path(ns)?;
855        let lock = self.ns_lock(&path)?;
856        let _guard = lock
857            .lock()
858            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
859        let mut state = self.load(ns)?;
860        if state.contains_key(key) {
861            return Ok(false);
862        }
863        state.insert(key.to_string(), value);
864        self.save(ns, &state)?;
865        Ok(true)
866    }
867
868    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
869        if let Some(dpath) = self.dispatch_path(key)? {
870            let lock = self.ns_lock(&dpath)?;
871            let _guard = lock
872                .lock()
873                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
874            let current = if let Some(v) = self.load_dispatched(&dpath)? {
875                v.as_f64()
876                    .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?
877            } else {
878                // Fall back to any legacy value so incr stays monotonic
879                // across the dispatch transition.
880                let path = self.state_path(ns)?;
881                if path.exists() {
882                    let state = self.load(ns)?;
883                    match state.get(key) {
884                        Some(v) => v
885                            .as_f64()
886                            .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
887                        None => default,
888                    }
889                } else {
890                    default
891                }
892            };
893            let new_val = current + delta;
894            self.save_dispatched(&dpath, &serde_json::json!(new_val))?;
895            return Ok(new_val);
896        }
897        let path = self.state_path(ns)?;
898        let lock = self.ns_lock(&path)?;
899        let _guard = lock
900            .lock()
901            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
902        let mut state = self.load(ns)?;
903        let current = match state.get(key) {
904            Some(v) => v
905                .as_f64()
906                .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
907            None => default,
908        };
909        let new_val = current + delta;
910        state.insert(key.to_string(), serde_json::json!(new_val));
911        self.save(ns, &state)?;
912        Ok(new_val)
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use super::*;
919    use tempfile::TempDir;
920
921    /// Create a JsonFileStore rooted in a fresh tempdir, returning both
922    /// so the TempDir guard lives for the test duration.
923    fn new_store() -> (JsonFileStore, TempDir) {
924        let tmp = tempfile::tempdir().unwrap();
925        let store = JsonFileStore::new(tmp.path().to_path_buf());
926        (store, tmp)
927    }
928
929    #[test]
930    fn roundtrip() {
931        let (store, _tmp) = new_store();
932        let ns = "rt";
933
934        store.set(ns, "count", serde_json::json!(42)).unwrap();
935        store
936            .set(ns, "name", serde_json::json!("algocline"))
937            .unwrap();
938
939        assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
940        assert_eq!(
941            store.get(ns, "name").unwrap(),
942            Some(serde_json::json!("algocline"))
943        );
944        assert_eq!(store.get(ns, "missing").unwrap(), None);
945
946        let k = store.keys(ns).unwrap();
947        assert!(k.contains(&"count".to_string()));
948        assert!(k.contains(&"name".to_string()));
949
950        assert!(store.delete(ns, "count").unwrap());
951        assert!(!store.delete(ns, "count").unwrap());
952        assert_eq!(store.get(ns, "count").unwrap(), None);
953    }
954
955    #[test]
956    fn invalid_namespace() {
957        let (store, _tmp) = new_store();
958        assert!(store.state_path("../evil").is_err());
959        assert!(store.state_path("foo/bar").is_err());
960        assert!(store.state_path("foo\\bar").is_err());
961        assert!(store.state_path("").is_err());
962        assert!(store.state_path("foo\0bar").is_err());
963    }
964
965    #[test]
966    fn get_nonexistent_namespace_returns_empty() {
967        let (store, _tmp) = new_store();
968        let result = store.get("ghost_ns", "any_key").unwrap();
969        assert_eq!(result, None);
970    }
971
972    #[test]
973    fn keys_nonexistent_namespace_returns_empty() {
974        let (store, _tmp) = new_store();
975        let result = store.keys("ghost_ns").unwrap();
976        assert!(result.is_empty());
977    }
978
979    #[test]
980    fn delete_nonexistent_key_returns_false() {
981        let (store, _tmp) = new_store();
982        assert!(!store.delete("delns", "nope").unwrap());
983    }
984
985    #[test]
986    fn set_overwrites_existing_value() {
987        let (store, _tmp) = new_store();
988        let ns = "ow";
989
990        store.set(ns, "k", serde_json::json!(1)).unwrap();
991        store.set(ns, "k", serde_json::json!(2)).unwrap();
992        assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
993    }
994
995    #[test]
996    fn state_path_valid_namespaces() {
997        let (store, _tmp) = new_store();
998        assert!(store.state_path("default").is_ok());
999        assert!(store.state_path("my-app").is_ok());
1000        assert!(store.state_path("test_123").is_ok());
1001    }
1002
1003    // ─── Tier 1: has / set_nx / incr ──────────────────────────
1004
1005    #[test]
1006    fn has_returns_existence() {
1007        let (store, _tmp) = new_store();
1008        let ns = "hasns";
1009
1010        assert!(!store.has(ns, "x").unwrap());
1011        store.set(ns, "x", serde_json::json!(1)).unwrap();
1012        assert!(store.has(ns, "x").unwrap());
1013    }
1014
1015    #[test]
1016    fn set_nx_only_sets_if_absent() {
1017        let (store, _tmp) = new_store();
1018        let ns = "snx";
1019
1020        assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
1021        assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
1022        assert_eq!(
1023            store.get(ns, "k").unwrap(),
1024            Some(serde_json::json!("first")),
1025            "set_nx should not overwrite"
1026        );
1027    }
1028
1029    #[test]
1030    fn incr_initialises_and_increments() {
1031        let (store, _tmp) = new_store();
1032        let ns = "inc";
1033
1034        // Missing key: initialise from default (0) + delta (1) = 1
1035        let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
1036        assert!((v - 1.0).abs() < f64::EPSILON);
1037
1038        // Increment existing
1039        let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
1040        assert!((v - 6.0).abs() < f64::EPSILON);
1041
1042        // Negative delta
1043        let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
1044        assert!((v - 4.0).abs() < f64::EPSILON);
1045    }
1046
1047    #[test]
1048    fn incr_rejects_non_numeric() {
1049        let (store, _tmp) = new_store();
1050        let ns = "incerr";
1051
1052        store.set(ns, "s", serde_json::json!("hello")).unwrap();
1053        let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
1054        assert!(err.contains("not a number"), "got: {err}");
1055    }
1056
1057    #[test]
1058    fn incr_custom_default() {
1059        let (store, _tmp) = new_store();
1060        let ns = "incdef";
1061
1062        let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
1063        assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
1064    }
1065
1066    // ─── Per-key dispatch (issue #1776868812) ─────────────────────────
1067
1068    /// Keys shaped `{prefix}:{id}` with safe segments are written to
1069    /// `{root}/{prefix}/{id}.json` rather than crammed into the legacy
1070    /// `{ns}.json` SSoT.
1071    #[test]
1072    fn dispatch_writes_to_per_key_file_for_prefix_id_keys() {
1073        let (store, tmp) = new_store();
1074        store
1075            .set(
1076                "default",
1077                "flow_orch:abc-123",
1078                serde_json::json!({"step": 1}),
1079            )
1080            .unwrap();
1081        let dispatched = tmp.path().join("flow_orch").join("abc-123.json");
1082        assert!(
1083            dispatched.exists(),
1084            "dispatched file must exist at {}",
1085            dispatched.display()
1086        );
1087        // Legacy file must NOT have been touched for this key.
1088        let legacy = tmp.path().join("default.json");
1089        assert!(
1090            !legacy.exists(),
1091            "legacy default.json must not be created for dispatched keys"
1092        );
1093    }
1094
1095    /// Read path: dispatched file takes precedence; legacy `{ns}.json`
1096    /// is consulted only when the dispatched file is absent (so
1097    /// pre-dispatch entries remain readable without migration).
1098    #[test]
1099    fn dispatch_read_falls_back_to_legacy_for_unmigrated_entries() {
1100        let (store, tmp) = new_store();
1101        // Pre-populate the legacy default.json by writing a key without
1102        // a `:` (forces the legacy path) then manually inject the
1103        // dispatched-shaped key into the same file to simulate a state
1104        // produced before dispatch was enabled.
1105        store
1106            .set("default", "boot_marker", serde_json::json!(true))
1107            .unwrap();
1108        let legacy_path = tmp.path().join("default.json");
1109        let mut existing: HashMap<String, Value> =
1110            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1111        existing.insert(
1112            "flow_legacy:xyz".to_string(),
1113            serde_json::json!({"old": "value"}),
1114        );
1115        std::fs::write(
1116            &legacy_path,
1117            serde_json::to_string_pretty(&existing).unwrap(),
1118        )
1119        .unwrap();
1120
1121        // Read returns the legacy value because no dispatched file exists.
1122        assert_eq!(
1123            store.get("default", "flow_legacy:xyz").unwrap(),
1124            Some(serde_json::json!({"old": "value"})),
1125            "must fall back to legacy default.json when dispatched file absent"
1126        );
1127
1128        // Once we set a new value, it lands in the dispatched file and
1129        // future reads see the new value (legacy entry is shadowed).
1130        store
1131            .set(
1132                "default",
1133                "flow_legacy:xyz",
1134                serde_json::json!({"new": "promoted"}),
1135            )
1136            .unwrap();
1137        assert!(
1138            tmp.path().join("flow_legacy").join("xyz.json").exists(),
1139            "set() must promote dispatched-shaped keys to per-key file"
1140        );
1141        assert_eq!(
1142            store.get("default", "flow_legacy:xyz").unwrap(),
1143            Some(serde_json::json!({"new": "promoted"})),
1144            "dispatched file must shadow legacy entry on subsequent reads"
1145        );
1146    }
1147
1148    /// Keys without a `:` separator (or with unsafe characters in either
1149    /// segment) bypass dispatch and use the legacy single-file store.
1150    #[test]
1151    fn dispatch_skips_keys_without_colon_or_with_unsafe_segments() {
1152        let (store, tmp) = new_store();
1153        store
1154            .set("default", "no_colon", serde_json::json!(1))
1155            .unwrap();
1156        store
1157            .set("default", "bad/prefix:id", serde_json::json!(2))
1158            .unwrap();
1159        store
1160            .set("default", "prefix:bad/id", serde_json::json!(3))
1161            .unwrap();
1162        store
1163            .set("default", "prefix:..", serde_json::json!(4))
1164            .unwrap();
1165        // All four go to legacy default.json.
1166        let legacy = tmp.path().join("default.json");
1167        let raw: HashMap<String, Value> =
1168            serde_json::from_str(&std::fs::read_to_string(&legacy).unwrap()).unwrap();
1169        assert_eq!(raw.get("no_colon"), Some(&serde_json::json!(1)));
1170        assert_eq!(raw.get("bad/prefix:id"), Some(&serde_json::json!(2)));
1171        assert_eq!(raw.get("prefix:bad/id"), Some(&serde_json::json!(3)));
1172        assert_eq!(raw.get("prefix:.."), Some(&serde_json::json!(4)));
1173        // No subdirectories were created.
1174        assert!(!tmp.path().join("bad").exists());
1175        assert!(!tmp.path().join("prefix").exists());
1176    }
1177
1178    /// `delete` removes the dispatched file and returns `true`.
1179    #[test]
1180    fn dispatch_delete_removes_per_key_file() {
1181        let (store, tmp) = new_store();
1182        store.set("default", "p:q", serde_json::json!("v")).unwrap();
1183        let dispatched = tmp.path().join("p").join("q.json");
1184        assert!(
1185            dispatched.exists(),
1186            "dispatched file should exist before delete"
1187        );
1188        assert!(store.delete("default", "p:q").unwrap());
1189        assert!(
1190            !dispatched.exists(),
1191            "dispatched file should be removed after delete"
1192        );
1193        // Re-deleting returns false.
1194        assert!(!store.delete("default", "p:q").unwrap());
1195    }
1196
1197    /// `has` reflects dispatched file existence.
1198    #[test]
1199    fn dispatch_has_reports_dispatched_file_existence() {
1200        let (store, _tmp) = new_store();
1201        assert!(!store.has("default", "p:q").unwrap());
1202        store.set("default", "p:q", serde_json::json!("v")).unwrap();
1203        assert!(store.has("default", "p:q").unwrap());
1204    }
1205
1206    /// `set_nx` honours both the dispatched file and any legacy entry
1207    /// to keep set-if-not-exists semantics consistent across migration.
1208    #[test]
1209    fn dispatch_set_nx_blocks_when_legacy_or_dispatched_entry_exists() {
1210        let (store, tmp) = new_store();
1211        // Inject a legacy entry under the dispatched-shaped key.
1212        store
1213            .set("default", "boot", serde_json::json!(true))
1214            .unwrap();
1215        let legacy_path = tmp.path().join("default.json");
1216        let mut existing: HashMap<String, Value> =
1217            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1218        existing.insert("p:q".to_string(), serde_json::json!("legacy_only"));
1219        std::fs::write(
1220            &legacy_path,
1221            serde_json::to_string_pretty(&existing).unwrap(),
1222        )
1223        .unwrap();
1224        // set_nx must refuse because the legacy entry exists.
1225        assert!(!store
1226            .set_nx("default", "p:q", serde_json::json!("new"))
1227            .unwrap());
1228
1229        // For a fresh dispatched-shaped key with no legacy entry, set_nx
1230        // creates the dispatched file and returns true; second call
1231        // returns false because the dispatched file now exists.
1232        assert!(store
1233            .set_nx("default", "p:r", serde_json::json!("first"))
1234            .unwrap());
1235        assert!(tmp.path().join("p").join("r.json").exists());
1236        assert!(!store
1237            .set_nx("default", "p:r", serde_json::json!("second"))
1238            .unwrap());
1239    }
1240
1241    /// `incr` operates on the dispatched file when the key matches the
1242    /// dispatch pattern; legacy values are migrated forward on the
1243    /// first call.
1244    #[test]
1245    fn dispatch_incr_promotes_legacy_value_on_first_call() {
1246        let (store, tmp) = new_store();
1247        // Pre-populate a legacy numeric value under a dispatched-shaped key.
1248        store.set("default", "seed", serde_json::json!(0)).unwrap();
1249        let legacy_path = tmp.path().join("default.json");
1250        let mut existing: HashMap<String, Value> =
1251            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1252        existing.insert("counter:cnt".to_string(), serde_json::json!(7));
1253        std::fs::write(
1254            &legacy_path,
1255            serde_json::to_string_pretty(&existing).unwrap(),
1256        )
1257        .unwrap();
1258
1259        // First incr: reads legacy value (7), writes new value (10) to
1260        // the dispatched file.
1261        let result = store.incr("default", "counter:cnt", 3.0, 0.0).unwrap();
1262        assert_eq!(result, 10.0);
1263        let dispatched = tmp.path().join("counter").join("cnt.json");
1264        assert!(dispatched.exists(), "dispatched file must be created");
1265
1266        // Second incr: reads dispatched (10), writes 12.
1267        let result2 = store.incr("default", "counter:cnt", 2.0, 0.0).unwrap();
1268        assert_eq!(result2, 12.0);
1269    }
1270
1271    /// `is_safe_segment` accepts alphanumerics + `_-.` and rejects
1272    /// path traversal sequences and reserved names.
1273    #[test]
1274    fn is_safe_segment_validates_path_safety() {
1275        assert!(is_safe_segment("flow_orch"));
1276        assert!(is_safe_segment("abc-123"));
1277        assert!(is_safe_segment("v1.2.3"));
1278        assert!(!is_safe_segment(""));
1279        assert!(!is_safe_segment("."));
1280        assert!(!is_safe_segment(".."));
1281        assert!(!is_safe_segment("a..b"));
1282        assert!(!is_safe_segment("a/b"));
1283        assert!(!is_safe_segment("a\\b"));
1284        assert!(!is_safe_segment("a b"));
1285        assert!(!is_safe_segment("a\0b"));
1286    }
1287
1288    // ─── Dispatched-layout helpers ─────────────────────────────────────────
1289
1290    mod dispatched_layout {
1291        use super::*;
1292
1293        /// Helper: write a JSON file directly into `{tmp}/{ns}/{key}.json`,
1294        /// creating the parent directory if needed.
1295        fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1296            let dir = tmp.path().join(ns);
1297            // Safe: test-only helper, directory creation cannot fail in practice
1298            fs::create_dir_all(&dir).expect("create ns dir");
1299            let path = dir.join(format!("{key}.json"));
1300            fs::write(
1301                path,
1302                serde_json::to_string_pretty(&value).expect("serialize"),
1303            )
1304            .expect("write seed file");
1305        }
1306
1307        /// `list_dispatched` returns only `.json` files and strips the suffix.
1308        #[test]
1309        fn list_returns_json_keys_only() {
1310            let (store, tmp) = new_store();
1311            seed(&tmp, "myns", "alpha", serde_json::json!(1));
1312            seed(&tmp, "myns", "beta", serde_json::json!(2));
1313            // Place non-.json and sibling files that must be excluded.
1314            let ns_dir = tmp.path().join("myns");
1315            fs::write(ns_dir.join("alpha.json.bak"), b"backup").expect("write bak");
1316            fs::write(ns_dir.join("alpha.json.tmp"), b"tmp").expect("write tmp");
1317            fs::write(ns_dir.join("notes.txt"), b"text").expect("write txt");
1318
1319            let keys = store.list_dispatched("myns").unwrap();
1320            assert_eq!(
1321                keys,
1322                vec!["alpha", "beta"],
1323                "must be sorted, .bak/.tmp/.txt excluded"
1324            );
1325        }
1326
1327        /// `list_dispatched` returns an empty Vec when the namespace directory
1328        /// does not exist (no error).
1329        #[test]
1330        fn list_returns_empty_for_absent_namespace() {
1331            let (store, _tmp) = new_store();
1332            let keys = store.list_dispatched("ghost").unwrap();
1333            assert!(keys.is_empty(), "absent namespace should return empty Vec");
1334        }
1335
1336        /// `list_dispatched` handles a namespace directory that exists but
1337        /// contains only non-`.json` files.
1338        #[test]
1339        fn list_returns_empty_when_only_non_json_files_present() {
1340            let (store, tmp) = new_store();
1341            let ns_dir = tmp.path().join("empty_ns");
1342            // Safe: test setup
1343            fs::create_dir_all(&ns_dir).expect("create dir");
1344            fs::write(ns_dir.join("readme.txt"), b"hi").expect("write");
1345            let keys = store.list_dispatched("empty_ns").unwrap();
1346            assert!(keys.is_empty());
1347        }
1348
1349        /// `show_dispatched` returns `KeyNotFound` when the namespace
1350        /// directory itself does not exist.
1351        #[test]
1352        fn show_returns_key_not_found_for_absent_namespace() {
1353            let (store, _tmp) = new_store();
1354            let err = store.show_dispatched("nodir", "anykey").unwrap_err();
1355            assert!(
1356                matches!(err, StateError::KeyNotFound { .. }),
1357                "expected KeyNotFound, got: {err}"
1358            );
1359            // Confirm the message contains "not found" as specified by the error format.
1360            assert!(err.to_string().contains("not found"), "{err}");
1361        }
1362
1363        /// `show_dispatched` returns `KeyNotFound` when the namespace
1364        /// directory exists but the key file is absent.
1365        #[test]
1366        fn show_returns_key_not_found_for_absent_key() {
1367            let (store, tmp) = new_store();
1368            // Create the namespace directory but not the key file.
1369            let ns_dir = tmp.path().join("myns2");
1370            // Safe: test setup
1371            fs::create_dir_all(&ns_dir).expect("create dir");
1372
1373            let err = store.show_dispatched("myns2", "missing").unwrap_err();
1374            assert!(
1375                matches!(err, StateError::KeyNotFound { .. }),
1376                "expected KeyNotFound, got: {err}"
1377            );
1378        }
1379
1380        /// `show_dispatched` returns the full JSON value when the key exists.
1381        #[test]
1382        fn show_returns_full_value_for_existing_key() {
1383            let (store, tmp) = new_store();
1384            let expected = serde_json::json!({"data": {"completed_steps": ["a", "b"], "x": 42}});
1385            seed(&tmp, "showns", "task1", expected.clone());
1386
1387            let result = store.show_dispatched("showns", "task1").unwrap();
1388            assert_eq!(result, expected);
1389        }
1390    }
1391
1392    mod reset_atomicity {
1393        use super::*;
1394
1395        /// Helper: write a JSON file directly into `{tmp}/{ns}/{key}.json`.
1396        fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1397            let dir = tmp.path().join(ns);
1398            // Safe: test setup
1399            fs::create_dir_all(&dir).expect("create ns dir");
1400            let path = dir.join(format!("{key}.json"));
1401            fs::write(
1402                path,
1403                serde_json::to_string_pretty(&value).expect("serialize"),
1404            )
1405            .expect("write seed");
1406        }
1407
1408        /// Reset removes specified steps and fields; backup file contains the
1409        /// original content; report reflects what was removed.
1410        #[test]
1411        fn reset_removes_steps_and_fields_and_creates_backup() {
1412            let (store, tmp) = new_store();
1413            let original = serde_json::json!({
1414                "data": {
1415                    "completed_steps": ["a", "b", "c"],
1416                    "x": 1,
1417                    "y": "hello"
1418                }
1419            });
1420            seed(&tmp, "testns", "task1", original.clone());
1421
1422            let report = store
1423                .reset_dispatched_with_backup(
1424                    "testns",
1425                    "task1",
1426                    &["b".to_string()],
1427                    &["x".to_string()],
1428                )
1429                .unwrap();
1430
1431            // Backup must exist and contain original content.
1432            let bak_path = tmp.path().join("testns").join("task1.json.bak");
1433            assert!(
1434                bak_path.exists(),
1435                ".bak file must exist at {}",
1436                bak_path.display()
1437            );
1438            assert_eq!(report.backup_path, bak_path);
1439            let bak_content: serde_json::Value =
1440                serde_json::from_str(&fs::read_to_string(&bak_path).expect("read bak"))
1441                    .expect("parse bak");
1442            assert_eq!(bak_content, original, ".bak must contain original content");
1443
1444            // Live file must reflect mutations.
1445            let live_path = tmp.path().join("testns").join("task1.json");
1446            let live_content: serde_json::Value =
1447                serde_json::from_str(&fs::read_to_string(&live_path).expect("read live"))
1448                    .expect("parse live");
1449            let expected = serde_json::json!({
1450                "data": {
1451                    "completed_steps": ["a", "c"],
1452                    "y": "hello"
1453                }
1454            });
1455            assert_eq!(live_content, expected, "live file must be mutated");
1456
1457            // Report counts.
1458            assert_eq!(report.steps_removed, 1, "one step removed");
1459            assert_eq!(report.fields_removed, 1, "one field removed");
1460        }
1461
1462        /// Reset with both steps and fields removed (2-case variant).
1463        #[test]
1464        fn reset_removes_multiple_steps_and_fields() {
1465            let (store, tmp) = new_store();
1466            let original = serde_json::json!({
1467                "data": {
1468                    "completed_steps": ["s1", "s2", "s3", "s4"],
1469                    "repo_readiness": "NOT_READY",
1470                    "repo_readiness_report": "details here",
1471                    "plan_gate_retries": 2
1472                }
1473            });
1474            seed(&tmp, "orchns", "task-abc", original.clone());
1475
1476            let report = store
1477                .reset_dispatched_with_backup(
1478                    "orchns",
1479                    "task-abc",
1480                    &["s2".to_string(), "s3".to_string()],
1481                    &[
1482                        "repo_readiness".to_string(),
1483                        "repo_readiness_report".to_string(),
1484                    ],
1485                )
1486                .unwrap();
1487
1488            let live_path = tmp.path().join("orchns").join("task-abc.json");
1489            let live: serde_json::Value =
1490                serde_json::from_str(&fs::read_to_string(&live_path).expect("read"))
1491                    .expect("parse");
1492            assert_eq!(
1493                live["data"]["completed_steps"],
1494                serde_json::json!(["s1", "s4"])
1495            );
1496            assert!(live["data"].get("repo_readiness").is_none());
1497            assert!(live["data"].get("repo_readiness_report").is_none());
1498            assert_eq!(live["data"]["plan_gate_retries"], 2);
1499
1500            assert_eq!(report.steps_removed, 2);
1501            assert_eq!(report.fields_removed, 2);
1502        }
1503
1504        /// Reset on a missing key returns `KeyNotFound`.
1505        #[test]
1506        fn reset_returns_key_not_found_for_absent_file() {
1507            let (store, _tmp) = new_store();
1508            let err = store
1509                .reset_dispatched_with_backup("ns", "missing", &[], &[])
1510                .unwrap_err();
1511            assert!(
1512                matches!(err, StateError::KeyNotFound { .. }),
1513                "expected KeyNotFound, got: {err}"
1514            );
1515        }
1516
1517        /// Reset returns `ShapeInvalid` when `data` top-level field is absent.
1518        #[test]
1519        fn reset_returns_shape_invalid_when_data_absent() {
1520            let (store, tmp) = new_store();
1521            // File has no "data" key.
1522            let bad = serde_json::json!({"identity": {"task_id": "t1"}});
1523            let dir = tmp.path().join("badns");
1524            // Safe: test setup
1525            fs::create_dir_all(&dir).expect("create dir");
1526            fs::write(
1527                dir.join("k.json"),
1528                serde_json::to_string_pretty(&bad).expect("ser"),
1529            )
1530            .expect("write");
1531
1532            let err = store
1533                .reset_dispatched_with_backup("badns", "k", &["s".to_string()], &[])
1534                .unwrap_err();
1535            assert!(
1536                matches!(err, StateError::ShapeInvalid { .. }),
1537                "expected ShapeInvalid, got: {err}"
1538            );
1539            assert!(err.to_string().contains("data"), "{err}");
1540        }
1541
1542        /// Reset returns `ShapeInvalid` when `data.completed_steps` is not
1543        /// an array.
1544        #[test]
1545        fn reset_returns_shape_invalid_when_completed_steps_not_array() {
1546            let (store, tmp) = new_store();
1547            // completed_steps is an object, not an array.
1548            let bad = serde_json::json!({"data": {"completed_steps": {"step": "a"}}});
1549            let dir = tmp.path().join("badns2");
1550            // Safe: test setup
1551            fs::create_dir_all(&dir).expect("create dir");
1552            fs::write(
1553                dir.join("k.json"),
1554                serde_json::to_string_pretty(&bad).expect("ser"),
1555            )
1556            .expect("write");
1557
1558            let err = store
1559                .reset_dispatched_with_backup("badns2", "k", &["a".to_string()], &[])
1560                .unwrap_err();
1561            assert!(
1562                matches!(err, StateError::ShapeInvalid { .. }),
1563                "expected ShapeInvalid, got: {err}"
1564            );
1565            assert!(
1566                err.to_string().contains("completed_steps"),
1567                "message should mention completed_steps: {err}"
1568            );
1569        }
1570    }
1571
1572    mod path_traversal {
1573        use super::*;
1574
1575        /// `list_dispatched` rejects unsafe namespace segments.
1576        #[test]
1577        fn list_rejects_unsafe_namespace() {
1578            let (store, _tmp) = new_store();
1579            let err = store.list_dispatched("../evil").unwrap_err();
1580            assert!(
1581                matches!(
1582                    err,
1583                    StateError::UnsafeSegment {
1584                        which: "namespace",
1585                        ..
1586                    }
1587                ),
1588                "expected UnsafeSegment{{namespace}}, got: {err}"
1589            );
1590        }
1591
1592        /// `show_dispatched` rejects an unsafe key segment.
1593        #[test]
1594        fn show_rejects_unsafe_key() {
1595            let (store, _tmp) = new_store();
1596            let err = store.show_dispatched("ns", "foo/bar").unwrap_err();
1597            assert!(
1598                matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1599                "expected UnsafeSegment{{key}}, got: {err}"
1600            );
1601        }
1602
1603        /// `reset_dispatched_with_backup` rejects an empty namespace segment.
1604        #[test]
1605        fn reset_rejects_empty_namespace() {
1606            let (store, _tmp) = new_store();
1607            let err = store
1608                .reset_dispatched_with_backup("", "key", &[], &[])
1609                .unwrap_err();
1610            assert!(
1611                matches!(
1612                    err,
1613                    StateError::UnsafeSegment {
1614                        which: "namespace",
1615                        ..
1616                    }
1617                ),
1618                "expected UnsafeSegment{{namespace}}, got: {err}"
1619            );
1620        }
1621
1622        /// `reset_dispatched_with_backup` rejects a `..` key segment.
1623        #[test]
1624        fn reset_rejects_dotdot_key() {
1625            let (store, _tmp) = new_store();
1626            let err = store
1627                .reset_dispatched_with_backup("ns", "..", &[], &[])
1628                .unwrap_err();
1629            assert!(
1630                matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1631                "expected UnsafeSegment{{key}}, got: {err}"
1632            );
1633        }
1634    }
1635}
1636
1637#[cfg(test)]
1638mod proptests {
1639    use super::*;
1640    use proptest::prelude::*;
1641
1642    fn new_store() -> (JsonFileStore, tempfile::TempDir) {
1643        let tmp = tempfile::tempdir().unwrap();
1644        let store = JsonFileStore::new(tmp.path().to_path_buf());
1645        (store, tmp)
1646    }
1647
1648    proptest! {
1649        /// Any valid namespace (alphanumeric + hyphen/underscore) round-trips through set/get.
1650        #[test]
1651        fn roundtrip_arbitrary_values(
1652            key in "[a-z]{1,20}",
1653            val in any::<i64>(),
1654        ) {
1655            let (store, _tmp) = new_store();
1656            let ns = "rt";
1657            let json_val = serde_json::json!(val);
1658            store.set(ns, &key, json_val.clone()).unwrap();
1659            let got = store.get(ns, &key).unwrap();
1660            prop_assert_eq!(got, Some(json_val));
1661            let _ = store.delete(ns, &key);
1662        }
1663
1664        /// Path traversal patterns are always rejected.
1665        #[test]
1666        fn traversal_always_rejected(
1667            prefix in "[a-z]{0,5}",
1668            suffix in "[a-z]{0,5}",
1669        ) {
1670            let (store, _tmp) = new_store();
1671            let evil = format!("{prefix}/../{suffix}");
1672            prop_assert!(store.state_path(&evil).is_err());
1673        }
1674
1675        /// state_path rejects NUL bytes anywhere in the namespace.
1676        #[test]
1677        fn nul_byte_always_rejected(
1678            prefix in "[a-z]{0,10}",
1679            suffix in "[a-z]{0,10}",
1680        ) {
1681            let (store, _tmp) = new_store();
1682            let evil = format!("{prefix}\0{suffix}");
1683            prop_assert!(store.state_path(&evil).is_err());
1684        }
1685    }
1686}