Skip to main content

algocline_engine/
state.rs

1//! Persistent key-value state backed by JSON files.
2//!
3//! ## Architecture
4//!
5//! All state operations go through the [`StateStore`] trait, which
6//! abstracts the storage backend.  The default implementation,
7//! [`JsonFileStore`], persists each namespace as a JSON file under a
8//! caller-provided root directory with atomic writes (tmp + rename).
9//!
10//! ## Tier 1 — Current API
11//!
12//! | Operation | Description |
13//! |-----------|-------------|
14//! | `get` | Read a value (returns `None` if absent) |
15//! | `set` | Write a value (upsert) |
16//! | `delete` | Remove a key (returns whether it existed) |
17//! | `keys` | List all keys in a namespace |
18//! | `has` | Check existence (cost is backend-dependent) |
19//! | `set_nx` | Set-if-not-exists (returns `false` if key already present) |
20//! | `incr` | Counter increment — single-process atomic (read-modify-write) |
21//!
22//! ## Tier 2 — Future Extensions (design notes, not yet implemented)
23//!
24//! The following operations are planned but **not yet implemented**.
25//! The trait is designed to accommodate them without breaking changes.
26//! Review this list when adding a new backend.
27//!
28//! - **TTL**: `set(key, value, opts)` with `opts.ttl_secs`, plus
29//!   `ttl(key) -> Option<Duration>` to query remaining time.  Useful
30//!   for caching patterns (e.g. Hub index cache, LLM response cache).
31//! - **Batch**: `mget(keys) -> Vec<Option<Value>>` and
32//!   `mset(pairs) -> Result<()>`.  Reduces I/O round-trips for
33//!   file/network backends.
34//! - **clear**: Flush all keys in a namespace.  OpenResty's
35//!   `flush_all` equivalent.
36//!
37//! ## Backend Swappability
38//!
39//! Because the engine interacts with state only through the
40//! [`StateStore`] trait, backends can be swapped without changing Lua
41//! code.  Planned backends:
42//!
43//! - `JsonFileStore` (current, default)
44//! - In-memory `HashMap` (for tests and short-lived sessions)
45//! - SQLite (for larger datasets with indexed queries)
46//! - Redis (for distributed / multi-process scenarios)
47
48use std::collections::HashMap;
49use std::fs;
50use std::path::{Path, PathBuf};
51use std::sync::{Arc, Mutex};
52
53use serde_json::Value;
54
55// ═══════════════════════════════════════════════════════════════
56// Typed error for dispatched-layout operations
57// ═══════════════════════════════════════════════════════════════
58
59/// Errors returned by the dispatched-layout helpers
60/// (`list_dispatched`, `show_dispatched`, `reset_dispatched_with_backup`).
61///
62/// Unlike the legacy [`StateStore`] trait methods (which return `String`
63/// errors), these helpers use a typed enum so callers can distinguish
64/// missing-key from I/O failure at the type level without pattern-matching
65/// on OS error codes.
66#[derive(thiserror::Error, Debug)]
67pub enum StateError {
68    /// The requested key does not exist in the given namespace.
69    ///
70    /// # Arguments
71    /// * `namespace` — the namespace that was searched
72    /// * `key` — the key that was not found
73    #[error("state: key '{key}' not found in namespace '{namespace}'")]
74    KeyNotFound { namespace: String, key: String },
75
76    /// A namespace or key segment failed the path-safety check.
77    ///
78    /// # Arguments
79    /// * `which` — either `"namespace"` or `"key"`
80    /// * `value` — the offending segment value
81    #[error("state: unsafe {which} segment '{value}'")]
82    UnsafeSegment { which: &'static str, value: String },
83
84    /// A backup I/O operation (`fs::copy` to `.bak`) failed.
85    ///
86    /// Wraps the underlying [`std::io::Error`].  Kept separate from
87    /// [`StateError::IoWrite`] so callers know that the live file was
88    /// not yet touched when this error occurs.
89    #[error("state: backup I/O failed: {0}")]
90    IoBackup(#[source] std::io::Error),
91
92    /// A read or directory-scan operation failed.
93    ///
94    /// Wraps the underlying [`std::io::Error`].  Covers `fs::read_to_string`,
95    /// `fs::read_dir`, and `DirEntry` iteration.
96    #[error("state: read failed: {0}")]
97    IoRead(#[source] std::io::Error),
98
99    /// A write or rename operation on the live file or its `.tmp`
100    /// staging copy failed.
101    ///
102    /// Wraps the underlying [`std::io::Error`].  Covers `fs::write` and
103    /// `fs::rename`.
104    #[error("state: write failed: {0}")]
105    IoWrite(#[source] std::io::Error),
106
107    /// JSON serialization or deserialization failed.
108    ///
109    /// Uses `#[from]` so `?` auto-converts `serde_json::Error`.
110    #[error("state: serialize/parse failed: {0}")]
111    Serde(#[from] serde_json::Error),
112
113    /// The stored JSON does not have the expected shape.
114    ///
115    /// # Arguments
116    /// * `reason` — human-readable description of the shape violation
117    ///   (e.g. `"missing 'data' top-level field"` or
118    ///   `"data.completed_steps must be an array"`)
119    #[error("state: shape invalid: {reason}")]
120    ShapeInvalid { reason: String },
121}
122
123// ═══════════════════════════════════════════════════════════════
124// ResetReport — return value of reset_dispatched_with_backup
125// ═══════════════════════════════════════════════════════════════
126
127/// Report returned by [`JsonFileStore::reset_dispatched_with_backup`]
128/// describing what was modified.
129#[derive(Debug, Clone)]
130pub struct ResetReport {
131    /// Path to the `.bak` snapshot created before the mutation.
132    pub backup_path: PathBuf,
133    /// Number of entries removed from `data.completed_steps`.
134    pub steps_removed: usize,
135    /// Number of keys deleted from the `data` top-level object.
136    pub fields_removed: usize,
137}
138
139/// Whether a string is a safe path segment for `dispatch_path`.
140///
141/// Accepts ASCII alphanumerics, `_`, `-`, and `.` (single dots only —
142/// path traversal `..` and reserved names `.` are rejected). Empty
143/// strings and any other character (slash, backslash, NUL, control
144/// chars, whitespace) cause dispatch to fall back to legacy single-file
145/// storage.
146fn is_safe_segment(s: &str) -> bool {
147    if s.is_empty() || s == "." || s == ".." {
148        return false;
149    }
150    if s.contains("..") {
151        return false;
152    }
153    s.bytes()
154        .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-' || b == b'.')
155}
156
157// ═══════════════════════════════════════════════════════════════
158// Trait
159// ═══════════════════════════════════════════════════════════════
160
161/// Backend-agnostic key-value state store.
162///
163/// All operations are namespace-scoped.  Implementations must be
164/// `Send + Sync` so they can be shared across Lua VMs (e.g. fork).
165pub trait StateStore: Send + Sync {
166    /// Read a value.  Returns `None` if the key does not exist.
167    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
168
169    /// Write a value (upsert).
170    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
171
172    /// Remove a key.  Returns `true` if it existed.
173    fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
174
175    /// List all keys in a namespace.
176    fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
177
178    /// Check whether a key exists.
179    ///
180    /// Whether this is cheaper than `get` + nil check depends on the
181    /// backend.  `JsonFileStore` still loads the whole namespace; backends
182    /// like Redis or SQLite can answer with an `EXISTS` command.
183    fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
184
185    /// Set a value only if the key does **not** already exist.
186    /// Returns `true` if the value was written, `false` if the key
187    /// was already present.
188    ///
189    /// **Note:** `JsonFileStore` serialises this operation per namespace
190    /// with an in-process `Mutex`, making it safe across concurrent tokio
191    /// tasks within the same process.  Cross-process atomicity still
192    /// requires a backend with native CAS (Redis `SETNX`, SQLite
193    /// transactions).
194    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
195
196    /// Counter increment, serialised per namespace within the same process.
197    ///
198    /// Adds `delta` to the current numeric value at `key`.  If the key
199    /// is missing, initialises it to `default` before adding.  Returns
200    /// the new value.
201    ///
202    /// `JsonFileStore` acquires a per-namespace `Mutex` for the full
203    /// read-modify-write cycle, preventing lost updates across concurrent
204    /// tokio tasks.  For multi-process safety use a backend with native
205    /// `INCR` (Redis) or transactions (SQLite).
206    ///
207    /// Uses `f64` internally.  Integer-valued deltas are exact; fractional
208    /// deltas may accumulate floating-point rounding errors over many calls.
209    ///
210    /// Errors if the existing value is not a JSON number.
211    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
212}
213
214// ═══════════════════════════════════════════════════════════════
215// JsonFileStore — default backend
216// ═══════════════════════════════════════════════════════════════
217
218/// JSON-file-backed state store.
219///
220/// Each namespace is a single JSON file at
221/// `{root}/{namespace}.json`.  Writes are atomic: the new state is
222/// written to a `.tmp` sibling and then renamed.
223///
224/// The root directory is provided at construction time; callers are
225/// expected to resolve it from the service-layer `AppDir` abstraction
226/// (typically `~/.algocline/state/`).
227///
228/// ## Concurrency
229///
230/// Per-namespace locks (`std::sync::Mutex`) prevent lost updates under
231/// concurrent `alc.state.*` calls within the same process.  The lock
232/// is acquired for the full load → mutate → atomic-rename cycle, so
233/// two tokio tasks operating on the **same** namespace are serialised.
234///
235/// Rationale for `std::sync::Mutex` over `tokio::sync::Mutex`:
236/// all I/O inside the lock uses `std::fs` (synchronous, no `.await`),
237/// so a standard mutex is sufficient and avoids holding a tokio mutex
238/// across potential scheduler context switches.
239///
240/// **Multi-process safety is NOT provided.**  If multiple `alc`
241/// processes share the same state directory (uncommon), use a backend
242/// with native `INCR` (Redis) or transactions (SQLite).
243pub struct JsonFileStore {
244    root: PathBuf,
245    /// Per-namespace locks.  Keyed by the resolved JSON file path so
246    /// that namespace validation is already applied before lookup.
247    locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
248}
249
250impl JsonFileStore {
251    /// Construct a store rooted at an explicit path.
252    ///
253    /// The directory is **not** created eagerly; it is created lazily
254    /// on the first `set` / `set_nx` / `incr` call via [`Self::state_path`].
255    pub fn new(root: PathBuf) -> Self {
256        Self {
257            root,
258            locks: Mutex::new(HashMap::new()),
259        }
260    }
261
262    /// Acquire (or create) the per-namespace lock and return a guard.
263    ///
264    /// The returned `std::sync::MutexGuard` keeps the namespace lock
265    /// held for the duration of the caller's load → mutate → save cycle.
266    /// The outer `locks` map is released immediately after the inner
267    /// `Arc` is cloned, so contention on unrelated namespaces is zero.
268    fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
269        let mut map = self
270            .locks
271            .lock()
272            .map_err(|_| "state: locks map poisoned".to_string())?;
273        Ok(Arc::clone(
274            map.entry(path.to_path_buf())
275                .or_insert_with(|| Arc::new(Mutex::new(()))),
276        ))
277    }
278
279    /// Return the root directory this store writes under.
280    pub fn root(&self) -> &Path {
281        &self.root
282    }
283
284    /// Ensure the root directory exists, returning it.
285    fn ensure_root(&self) -> Result<&Path, String> {
286        if !self.root.exists() {
287            fs::create_dir_all(&self.root)
288                .map_err(|e| format!("Failed to create state dir: {e}"))?;
289        }
290        Ok(&self.root)
291    }
292
293    /// Resolve the JSON file path for a namespace, validating the name
294    /// and creating the root directory on demand.
295    pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
296        if ns.contains('/')
297            || ns.contains('\\')
298            || ns.contains("..")
299            || ns.contains('\0')
300            || ns.is_empty()
301        {
302            return Err(format!("Invalid namespace: '{ns}'"));
303        }
304        let dir = self.ensure_root()?;
305        Ok(dir.join(format!("{ns}.json")))
306    }
307
308    /// Resolve the per-key dispatched file path for a `{prefix}:{id}`
309    /// shaped key, returning `None` when the key does not match the
310    /// dispatch contract (no `:`, multiple `:`, or unsafe characters
311    /// in either segment).
312    ///
313    /// Dispatched layout (issue #1776868812):
314    ///   `{root}/{prefix}/{id}.json` — file contents = the value as
315    ///   raw JSON (no wrapper map). Each `flow.state_save(state)` call
316    ///   becomes a single per-task file rather than another entry
317    ///   crammed into `default.json`.
318    ///
319    /// Legacy layout: keys without `:` (or with unsafe characters)
320    /// continue writing into `{ns}.json` so existing behaviour is
321    /// preserved without migration.
322    fn dispatch_path(&self, key: &str) -> Result<Option<PathBuf>, String> {
323        let (prefix, id) = match key.split_once(':') {
324            Some(pair) => pair,
325            None => return Ok(None),
326        };
327        if !is_safe_segment(prefix) || !is_safe_segment(id) {
328            return Ok(None);
329        }
330        let dir = self.ensure_root()?;
331        Ok(Some(dir.join(prefix).join(format!("{id}.json"))))
332    }
333
334    /// Read a dispatched value file and deserialize it as JSON.
335    /// Returns `Ok(None)` when the file does not exist.
336    fn load_dispatched(&self, path: &Path) -> Result<Option<Value>, String> {
337        if !path.exists() {
338            return Ok(None);
339        }
340        let content = fs::read_to_string(path)
341            .map_err(|e| format!("Failed to read dispatched state '{}': {e}", path.display()))?;
342        let v: Value = serde_json::from_str(&content)
343            .map_err(|e| format!("Failed to parse dispatched state '{}': {e}", path.display()))?;
344        Ok(Some(v))
345    }
346
347    /// Atomically write a value to a dispatched file (tmp + rename).
348    /// Creates the prefix subdirectory if missing.
349    fn save_dispatched(&self, path: &Path, value: &Value) -> Result<(), String> {
350        if let Some(parent) = path.parent() {
351            if !parent.exists() {
352                fs::create_dir_all(parent).map_err(|e| {
353                    format!(
354                        "Failed to create dispatched state dir '{}': {e}",
355                        parent.display()
356                    )
357                })?;
358            }
359        }
360        let tmp = path.with_extension("json.tmp");
361        let content = serde_json::to_string_pretty(value)
362            .map_err(|e| format!("Failed to serialize dispatched state: {e}"))?;
363        fs::write(&tmp, &content)
364            .map_err(|e| format!("Failed to write dispatched state tmp: {e}"))?;
365        fs::rename(&tmp, path)
366            .map_err(|e| format!("Failed to rename dispatched state file: {e}"))?;
367        Ok(())
368    }
369
370    // ─── Dispatched-layout helpers ─────────────────────────────────────────
371
372    /// List all keys in the dispatched layout for a namespace.
373    ///
374    /// Enumerates `{root}/{namespace}/*.json` and returns the file names
375    /// stripped of the `.json` extension, sorted lexicographically.
376    /// Files with extensions other than `.json`, and `.bak` / `.tmp`
377    /// siblings, are excluded.  If the namespace directory does not exist
378    /// the result is an empty `Vec` (namespace-absent ≡ zero keys).
379    ///
380    /// # Arguments
381    /// * `namespace` — the directory name under `root`; must pass
382    ///   [`is_safe_segment`] validation
383    ///
384    /// # Returns
385    /// A sorted list of key strings, or a [`StateError`] on I/O / validation
386    /// failure.
387    ///
388    /// # Errors
389    /// * [`StateError::UnsafeSegment`] if `namespace` fails path-safety check
390    /// * [`StateError::IoRead`] if reading the directory fails
391    pub fn list_dispatched(&self, namespace: &str) -> Result<Vec<String>, StateError> {
392        if !is_safe_segment(namespace) {
393            return Err(StateError::UnsafeSegment {
394                which: "namespace",
395                value: namespace.to_string(),
396            });
397        }
398        let ns_dir = self.root.join(namespace);
399        if !ns_dir.exists() {
400            return Ok(Vec::new());
401        }
402        let mut keys = Vec::new();
403        let entries = fs::read_dir(&ns_dir).map_err(StateError::IoRead)?;
404        for entry in entries {
405            let entry = entry.map_err(StateError::IoRead)?;
406            let fname = entry.file_name();
407            let fname_str = fname.to_string_lossy();
408            // Only include plain `.json` files; skip `.bak`, `.tmp`, and others.
409            if !fname_str.ends_with(".json")
410                || fname_str.ends_with(".json.bak")
411                || fname_str.ends_with(".json.tmp")
412            {
413                continue;
414            }
415            // Strip the `.json` suffix to recover the key name.
416            let key = fname_str
417                .strip_suffix(".json")
418                .unwrap_or(&fname_str)
419                .to_string();
420            keys.push(key);
421        }
422        keys.sort();
423        Ok(keys)
424    }
425
426    /// Read the full JSON value for a dispatched-layout key.
427    ///
428    /// Reads `{root}/{namespace}/{key}.json` and deserializes it.
429    ///
430    /// # Arguments
431    /// * `namespace` — the subdirectory name; must pass [`is_safe_segment`]
432    /// * `key` — the file stem; must pass [`is_safe_segment`]
433    ///
434    /// # Returns
435    /// The deserialized [`serde_json::Value`] on success.
436    ///
437    /// # Errors
438    /// * [`StateError::UnsafeSegment`] if either segment fails path-safety check
439    /// * [`StateError::KeyNotFound`] if the file does not exist
440    /// * [`StateError::IoRead`] if the file cannot be read
441    /// * [`StateError::Serde`] if the content is not valid JSON
442    pub fn show_dispatched(
443        &self,
444        namespace: &str,
445        key: &str,
446    ) -> Result<serde_json::Value, StateError> {
447        if !is_safe_segment(namespace) {
448            return Err(StateError::UnsafeSegment {
449                which: "namespace",
450                value: namespace.to_string(),
451            });
452        }
453        if !is_safe_segment(key) {
454            return Err(StateError::UnsafeSegment {
455                which: "key",
456                value: key.to_string(),
457            });
458        }
459        let target = self.root.join(namespace).join(format!("{key}.json"));
460        if !target.exists() {
461            return Err(StateError::KeyNotFound {
462                namespace: namespace.to_string(),
463                key: key.to_string(),
464            });
465        }
466        let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
467        let value: serde_json::Value = serde_json::from_str(&content)?;
468        Ok(value)
469    }
470
471    /// Atomically reset a dispatched-layout state file with a backup.
472    ///
473    /// Performs the following sequence in order (Crux atomicity contract):
474    ///
475    /// 1. Validate `namespace` and `key` with [`is_safe_segment`].
476    /// 2. Compute target path: `{root}/{namespace}/{key}.json`.
477    /// 3. Return [`StateError::KeyNotFound`] if the file does not exist.
478    /// 4. Acquire the per-path mutex via [`Self::ns_lock`]; hold until rename.
479    /// 5. Copy the live file to `{root}/{namespace}/{key}.json.bak` — the
480    ///    live file is **not** touched before this point.
481    /// 6. Load and parse the live file.
482    /// 7. Apply in-memory mutations:
483    ///    - Remove each element of `steps` from `data.completed_steps` (if
484    ///      the array exists).
485    ///    - Delete each element of `fields` from the `data` top-level object.
486    ///    - If the top-level `data` field is absent or not an object, return
487    ///      [`StateError::ShapeInvalid`].
488    /// 8. Write the mutated value to `{target}.tmp`.
489    /// 9. Rename `.tmp` → target (POSIX atomic on same filesystem).
490    ///
491    /// A crash between steps 5 and 9 leaves the `.bak` intact and the live
492    /// file unmodified (or only partially written to `.tmp`), so the original
493    /// state is always recoverable.
494    ///
495    /// # Arguments
496    /// * `namespace` — subdirectory name; must pass [`is_safe_segment`]
497    /// * `key` — file stem; must pass [`is_safe_segment`]
498    /// * `steps` — step names to remove from `data.completed_steps`
499    /// * `fields` — field names to delete from the `data` top-level object
500    ///
501    /// # Returns
502    /// A [`ResetReport`] with the backup path and counts of removed items.
503    ///
504    /// # Errors
505    /// * [`StateError::UnsafeSegment`] if either segment fails path-safety check
506    /// * [`StateError::KeyNotFound`] if the file does not exist
507    /// * [`StateError::ShapeInvalid`] if the lock is poisoned or the JSON
508    ///   structure is not a `{data: {...}}` object
509    /// * [`StateError::IoBackup`] if the `.bak` copy fails
510    /// * [`StateError::IoRead`] if loading the live file fails
511    /// * [`StateError::IoWrite`] if the `.tmp` write or rename fails
512    /// * [`StateError::Serde`] if the file content is not valid JSON
513    pub fn reset_dispatched_with_backup(
514        &self,
515        namespace: &str,
516        key: &str,
517        steps: &[String],
518        fields: &[String],
519    ) -> Result<ResetReport, StateError> {
520        // (a) Validate path segments.
521        if !is_safe_segment(namespace) {
522            return Err(StateError::UnsafeSegment {
523                which: "namespace",
524                value: namespace.to_string(),
525            });
526        }
527        if !is_safe_segment(key) {
528            return Err(StateError::UnsafeSegment {
529                which: "key",
530                value: key.to_string(),
531            });
532        }
533
534        // (b) Compute target path.
535        let target = self.root.join(namespace).join(format!("{key}.json"));
536
537        // (c) Return KeyNotFound if the file does not exist.
538        if !target.exists() {
539            return Err(StateError::KeyNotFound {
540                namespace: namespace.to_string(),
541                key: key.to_string(),
542            });
543        }
544
545        // (c.5) Acquire the per-path mutex and hold it until after rename.
546        let lock = self
547            .ns_lock(&target)
548            .map_err(|s| StateError::ShapeInvalid { reason: s })?;
549        let _guard = lock.lock().map_err(|_| StateError::ShapeInvalid {
550            reason: "lock poisoned".to_string(),
551        })?;
552
553        // (d) Create .bak backup — live file is not touched before this.
554        let bak_path = target.with_extension("json.bak");
555        fs::copy(&target, &bak_path).map_err(StateError::IoBackup)?;
556
557        // (e) Load and parse the live file.
558        let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
559        let mut value: serde_json::Value = serde_json::from_str(&content)?;
560
561        // (f) Apply in-memory mutations.
562        let data = value
563            .get_mut("data")
564            .ok_or_else(|| StateError::ShapeInvalid {
565                reason: "missing 'data' top-level field".to_string(),
566            })?;
567        let data_obj = data
568            .as_object_mut()
569            .ok_or_else(|| StateError::ShapeInvalid {
570                reason: "'data' top-level field must be an object".to_string(),
571            })?;
572
573        // Remove matching entries from data.completed_steps.
574        let mut steps_removed = 0usize;
575        if !steps.is_empty() {
576            if let Some(cs) = data_obj.get_mut("completed_steps") {
577                if let Some(arr) = cs.as_array_mut() {
578                    let before = arr.len();
579                    arr.retain(|v| {
580                        if let Some(s) = v.as_str() {
581                            !steps.iter().any(|step| step == s)
582                        } else {
583                            true
584                        }
585                    });
586                    steps_removed = before - arr.len();
587                } else {
588                    return Err(StateError::ShapeInvalid {
589                        reason: "data.completed_steps must be an array".to_string(),
590                    });
591                }
592            }
593            // If completed_steps key is absent, nothing to remove — not an error.
594        }
595
596        // Delete requested fields from the data object.
597        let mut fields_removed = 0usize;
598        for field in fields {
599            if data_obj.remove(field.as_str()).is_some() {
600                fields_removed += 1;
601            }
602        }
603
604        // (g) Write mutated value to .tmp staging file.
605        let tmp = target.with_extension("json.tmp");
606        let serialized = serde_json::to_string_pretty(&value)?;
607        fs::write(&tmp, &serialized).map_err(StateError::IoWrite)?;
608
609        // (h) Atomic rename: .tmp → live file.
610        fs::rename(&tmp, &target).map_err(StateError::IoWrite)?;
611
612        Ok(ResetReport {
613            backup_path: bak_path,
614            steps_removed,
615            fields_removed,
616        })
617    }
618
619    fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
620        let path = self.state_path(ns)?;
621        if !path.exists() {
622            return Ok(HashMap::new());
623        }
624        let content =
625            fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
626        serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
627    }
628
629    fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
630        let path = self.state_path(ns)?;
631        let tmp = path.with_extension("json.tmp");
632        let content = serde_json::to_string_pretty(data)
633            .map_err(|e| format!("Failed to serialize state: {e}"))?;
634        fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
635        fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
636        Ok(())
637    }
638}
639
640impl StateStore for JsonFileStore {
641    fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
642        // Dispatched path takes precedence — when present, that file is
643        // the canonical source. Falls back to the legacy `{ns}.json`
644        // store so existing entries written before dispatch was enabled
645        // remain readable without migration.
646        if let Some(dpath) = self.dispatch_path(key)? {
647            let lock = self.ns_lock(&dpath)?;
648            let _guard = lock
649                .lock()
650                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
651            if let Some(v) = self.load_dispatched(&dpath)? {
652                return Ok(Some(v));
653            }
654            // Fall through to legacy lookup so pre-dispatch values remain
655            // visible until the next set() promotes them.
656        }
657        let path = self.state_path(ns)?;
658        let lock = self.ns_lock(&path)?;
659        let _guard = lock
660            .lock()
661            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
662        let state = self.load(ns)?;
663        Ok(state.get(key).cloned())
664    }
665
666    fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
667        if let Some(dpath) = self.dispatch_path(key)? {
668            let lock = self.ns_lock(&dpath)?;
669            let _guard = lock
670                .lock()
671                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
672            return self.save_dispatched(&dpath, &value);
673        }
674        let path = self.state_path(ns)?;
675        let lock = self.ns_lock(&path)?;
676        let _guard = lock
677            .lock()
678            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
679        let mut state = self.load(ns)?;
680        state.insert(key.to_string(), value);
681        self.save(ns, &state)
682    }
683
684    fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
685        if let Some(dpath) = self.dispatch_path(key)? {
686            let lock = self.ns_lock(&dpath)?;
687            let _guard = lock
688                .lock()
689                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
690            if dpath.exists() {
691                fs::remove_file(&dpath).map_err(|e| {
692                    format!(
693                        "Failed to delete dispatched state '{}': {e}",
694                        dpath.display()
695                    )
696                })?;
697                return Ok(true);
698            }
699            // Fall through to legacy delete in case the entry only exists
700            // in the legacy single-file store.
701        }
702        let path = self.state_path(ns)?;
703        let lock = self.ns_lock(&path)?;
704        let _guard = lock
705            .lock()
706            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
707        let mut state = self.load(ns)?;
708        let existed = state.remove(key).is_some();
709        if existed {
710            self.save(ns, &state)?;
711        }
712        Ok(existed)
713    }
714
715    fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
716        let path = self.state_path(ns)?;
717        let lock = self.ns_lock(&path)?;
718        let _guard = lock
719            .lock()
720            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
721        let state = self.load(ns)?;
722        Ok(state.keys().cloned().collect())
723    }
724
725    fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
726        if let Some(dpath) = self.dispatch_path(key)? {
727            let lock = self.ns_lock(&dpath)?;
728            let _guard = lock
729                .lock()
730                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
731            if dpath.exists() {
732                return Ok(true);
733            }
734            // Fall through to legacy check.
735        }
736        let path = self.state_path(ns)?;
737        let lock = self.ns_lock(&path)?;
738        let _guard = lock
739            .lock()
740            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
741        let state = self.load(ns)?;
742        Ok(state.contains_key(key))
743    }
744
745    fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
746        if let Some(dpath) = self.dispatch_path(key)? {
747            let lock = self.ns_lock(&dpath)?;
748            let _guard = lock
749                .lock()
750                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
751            if dpath.exists() {
752                return Ok(false);
753            }
754            // Also honour any legacy entry to preserve set_nx semantics
755            // across the migration boundary.
756            let path = self.state_path(ns)?;
757            if path.exists() {
758                let state = self.load(ns)?;
759                if state.contains_key(key) {
760                    return Ok(false);
761                }
762            }
763            self.save_dispatched(&dpath, &value)?;
764            return Ok(true);
765        }
766        let path = self.state_path(ns)?;
767        let lock = self.ns_lock(&path)?;
768        let _guard = lock
769            .lock()
770            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
771        let mut state = self.load(ns)?;
772        if state.contains_key(key) {
773            return Ok(false);
774        }
775        state.insert(key.to_string(), value);
776        self.save(ns, &state)?;
777        Ok(true)
778    }
779
780    fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
781        if let Some(dpath) = self.dispatch_path(key)? {
782            let lock = self.ns_lock(&dpath)?;
783            let _guard = lock
784                .lock()
785                .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
786            let current = if let Some(v) = self.load_dispatched(&dpath)? {
787                v.as_f64()
788                    .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?
789            } else {
790                // Fall back to any legacy value so incr stays monotonic
791                // across the dispatch transition.
792                let path = self.state_path(ns)?;
793                if path.exists() {
794                    let state = self.load(ns)?;
795                    match state.get(key) {
796                        Some(v) => v
797                            .as_f64()
798                            .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
799                        None => default,
800                    }
801                } else {
802                    default
803                }
804            };
805            let new_val = current + delta;
806            self.save_dispatched(&dpath, &serde_json::json!(new_val))?;
807            return Ok(new_val);
808        }
809        let path = self.state_path(ns)?;
810        let lock = self.ns_lock(&path)?;
811        let _guard = lock
812            .lock()
813            .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
814        let mut state = self.load(ns)?;
815        let current = match state.get(key) {
816            Some(v) => v
817                .as_f64()
818                .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
819            None => default,
820        };
821        let new_val = current + delta;
822        state.insert(key.to_string(), serde_json::json!(new_val));
823        self.save(ns, &state)?;
824        Ok(new_val)
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use tempfile::TempDir;
832
833    /// Create a JsonFileStore rooted in a fresh tempdir, returning both
834    /// so the TempDir guard lives for the test duration.
835    fn new_store() -> (JsonFileStore, TempDir) {
836        let tmp = tempfile::tempdir().unwrap();
837        let store = JsonFileStore::new(tmp.path().to_path_buf());
838        (store, tmp)
839    }
840
841    #[test]
842    fn roundtrip() {
843        let (store, _tmp) = new_store();
844        let ns = "rt";
845
846        store.set(ns, "count", serde_json::json!(42)).unwrap();
847        store
848            .set(ns, "name", serde_json::json!("algocline"))
849            .unwrap();
850
851        assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
852        assert_eq!(
853            store.get(ns, "name").unwrap(),
854            Some(serde_json::json!("algocline"))
855        );
856        assert_eq!(store.get(ns, "missing").unwrap(), None);
857
858        let k = store.keys(ns).unwrap();
859        assert!(k.contains(&"count".to_string()));
860        assert!(k.contains(&"name".to_string()));
861
862        assert!(store.delete(ns, "count").unwrap());
863        assert!(!store.delete(ns, "count").unwrap());
864        assert_eq!(store.get(ns, "count").unwrap(), None);
865    }
866
867    #[test]
868    fn invalid_namespace() {
869        let (store, _tmp) = new_store();
870        assert!(store.state_path("../evil").is_err());
871        assert!(store.state_path("foo/bar").is_err());
872        assert!(store.state_path("foo\\bar").is_err());
873        assert!(store.state_path("").is_err());
874        assert!(store.state_path("foo\0bar").is_err());
875    }
876
877    #[test]
878    fn get_nonexistent_namespace_returns_empty() {
879        let (store, _tmp) = new_store();
880        let result = store.get("ghost_ns", "any_key").unwrap();
881        assert_eq!(result, None);
882    }
883
884    #[test]
885    fn keys_nonexistent_namespace_returns_empty() {
886        let (store, _tmp) = new_store();
887        let result = store.keys("ghost_ns").unwrap();
888        assert!(result.is_empty());
889    }
890
891    #[test]
892    fn delete_nonexistent_key_returns_false() {
893        let (store, _tmp) = new_store();
894        assert!(!store.delete("delns", "nope").unwrap());
895    }
896
897    #[test]
898    fn set_overwrites_existing_value() {
899        let (store, _tmp) = new_store();
900        let ns = "ow";
901
902        store.set(ns, "k", serde_json::json!(1)).unwrap();
903        store.set(ns, "k", serde_json::json!(2)).unwrap();
904        assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
905    }
906
907    #[test]
908    fn state_path_valid_namespaces() {
909        let (store, _tmp) = new_store();
910        assert!(store.state_path("default").is_ok());
911        assert!(store.state_path("my-app").is_ok());
912        assert!(store.state_path("test_123").is_ok());
913    }
914
915    // ─── Tier 1: has / set_nx / incr ──────────────────────────
916
917    #[test]
918    fn has_returns_existence() {
919        let (store, _tmp) = new_store();
920        let ns = "hasns";
921
922        assert!(!store.has(ns, "x").unwrap());
923        store.set(ns, "x", serde_json::json!(1)).unwrap();
924        assert!(store.has(ns, "x").unwrap());
925    }
926
927    #[test]
928    fn set_nx_only_sets_if_absent() {
929        let (store, _tmp) = new_store();
930        let ns = "snx";
931
932        assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
933        assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
934        assert_eq!(
935            store.get(ns, "k").unwrap(),
936            Some(serde_json::json!("first")),
937            "set_nx should not overwrite"
938        );
939    }
940
941    #[test]
942    fn incr_initialises_and_increments() {
943        let (store, _tmp) = new_store();
944        let ns = "inc";
945
946        // Missing key: initialise from default (0) + delta (1) = 1
947        let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
948        assert!((v - 1.0).abs() < f64::EPSILON);
949
950        // Increment existing
951        let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
952        assert!((v - 6.0).abs() < f64::EPSILON);
953
954        // Negative delta
955        let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
956        assert!((v - 4.0).abs() < f64::EPSILON);
957    }
958
959    #[test]
960    fn incr_rejects_non_numeric() {
961        let (store, _tmp) = new_store();
962        let ns = "incerr";
963
964        store.set(ns, "s", serde_json::json!("hello")).unwrap();
965        let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
966        assert!(err.contains("not a number"), "got: {err}");
967    }
968
969    #[test]
970    fn incr_custom_default() {
971        let (store, _tmp) = new_store();
972        let ns = "incdef";
973
974        let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
975        assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
976    }
977
978    // ─── Per-key dispatch (issue #1776868812) ─────────────────────────
979
980    /// Keys shaped `{prefix}:{id}` with safe segments are written to
981    /// `{root}/{prefix}/{id}.json` rather than crammed into the legacy
982    /// `{ns}.json` SSoT.
983    #[test]
984    fn dispatch_writes_to_per_key_file_for_prefix_id_keys() {
985        let (store, tmp) = new_store();
986        store
987            .set(
988                "default",
989                "flow_orch:abc-123",
990                serde_json::json!({"step": 1}),
991            )
992            .unwrap();
993        let dispatched = tmp.path().join("flow_orch").join("abc-123.json");
994        assert!(
995            dispatched.exists(),
996            "dispatched file must exist at {}",
997            dispatched.display()
998        );
999        // Legacy file must NOT have been touched for this key.
1000        let legacy = tmp.path().join("default.json");
1001        assert!(
1002            !legacy.exists(),
1003            "legacy default.json must not be created for dispatched keys"
1004        );
1005    }
1006
1007    /// Read path: dispatched file takes precedence; legacy `{ns}.json`
1008    /// is consulted only when the dispatched file is absent (so
1009    /// pre-dispatch entries remain readable without migration).
1010    #[test]
1011    fn dispatch_read_falls_back_to_legacy_for_unmigrated_entries() {
1012        let (store, tmp) = new_store();
1013        // Pre-populate the legacy default.json by writing a key without
1014        // a `:` (forces the legacy path) then manually inject the
1015        // dispatched-shaped key into the same file to simulate a state
1016        // produced before dispatch was enabled.
1017        store
1018            .set("default", "boot_marker", serde_json::json!(true))
1019            .unwrap();
1020        let legacy_path = tmp.path().join("default.json");
1021        let mut existing: HashMap<String, Value> =
1022            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1023        existing.insert(
1024            "flow_legacy:xyz".to_string(),
1025            serde_json::json!({"old": "value"}),
1026        );
1027        std::fs::write(
1028            &legacy_path,
1029            serde_json::to_string_pretty(&existing).unwrap(),
1030        )
1031        .unwrap();
1032
1033        // Read returns the legacy value because no dispatched file exists.
1034        assert_eq!(
1035            store.get("default", "flow_legacy:xyz").unwrap(),
1036            Some(serde_json::json!({"old": "value"})),
1037            "must fall back to legacy default.json when dispatched file absent"
1038        );
1039
1040        // Once we set a new value, it lands in the dispatched file and
1041        // future reads see the new value (legacy entry is shadowed).
1042        store
1043            .set(
1044                "default",
1045                "flow_legacy:xyz",
1046                serde_json::json!({"new": "promoted"}),
1047            )
1048            .unwrap();
1049        assert!(
1050            tmp.path().join("flow_legacy").join("xyz.json").exists(),
1051            "set() must promote dispatched-shaped keys to per-key file"
1052        );
1053        assert_eq!(
1054            store.get("default", "flow_legacy:xyz").unwrap(),
1055            Some(serde_json::json!({"new": "promoted"})),
1056            "dispatched file must shadow legacy entry on subsequent reads"
1057        );
1058    }
1059
1060    /// Keys without a `:` separator (or with unsafe characters in either
1061    /// segment) bypass dispatch and use the legacy single-file store.
1062    #[test]
1063    fn dispatch_skips_keys_without_colon_or_with_unsafe_segments() {
1064        let (store, tmp) = new_store();
1065        store
1066            .set("default", "no_colon", serde_json::json!(1))
1067            .unwrap();
1068        store
1069            .set("default", "bad/prefix:id", serde_json::json!(2))
1070            .unwrap();
1071        store
1072            .set("default", "prefix:bad/id", serde_json::json!(3))
1073            .unwrap();
1074        store
1075            .set("default", "prefix:..", serde_json::json!(4))
1076            .unwrap();
1077        // All four go to legacy default.json.
1078        let legacy = tmp.path().join("default.json");
1079        let raw: HashMap<String, Value> =
1080            serde_json::from_str(&std::fs::read_to_string(&legacy).unwrap()).unwrap();
1081        assert_eq!(raw.get("no_colon"), Some(&serde_json::json!(1)));
1082        assert_eq!(raw.get("bad/prefix:id"), Some(&serde_json::json!(2)));
1083        assert_eq!(raw.get("prefix:bad/id"), Some(&serde_json::json!(3)));
1084        assert_eq!(raw.get("prefix:.."), Some(&serde_json::json!(4)));
1085        // No subdirectories were created.
1086        assert!(!tmp.path().join("bad").exists());
1087        assert!(!tmp.path().join("prefix").exists());
1088    }
1089
1090    /// `delete` removes the dispatched file and returns `true`.
1091    #[test]
1092    fn dispatch_delete_removes_per_key_file() {
1093        let (store, tmp) = new_store();
1094        store.set("default", "p:q", serde_json::json!("v")).unwrap();
1095        let dispatched = tmp.path().join("p").join("q.json");
1096        assert!(
1097            dispatched.exists(),
1098            "dispatched file should exist before delete"
1099        );
1100        assert!(store.delete("default", "p:q").unwrap());
1101        assert!(
1102            !dispatched.exists(),
1103            "dispatched file should be removed after delete"
1104        );
1105        // Re-deleting returns false.
1106        assert!(!store.delete("default", "p:q").unwrap());
1107    }
1108
1109    /// `has` reflects dispatched file existence.
1110    #[test]
1111    fn dispatch_has_reports_dispatched_file_existence() {
1112        let (store, _tmp) = new_store();
1113        assert!(!store.has("default", "p:q").unwrap());
1114        store.set("default", "p:q", serde_json::json!("v")).unwrap();
1115        assert!(store.has("default", "p:q").unwrap());
1116    }
1117
1118    /// `set_nx` honours both the dispatched file and any legacy entry
1119    /// to keep set-if-not-exists semantics consistent across migration.
1120    #[test]
1121    fn dispatch_set_nx_blocks_when_legacy_or_dispatched_entry_exists() {
1122        let (store, tmp) = new_store();
1123        // Inject a legacy entry under the dispatched-shaped key.
1124        store
1125            .set("default", "boot", serde_json::json!(true))
1126            .unwrap();
1127        let legacy_path = tmp.path().join("default.json");
1128        let mut existing: HashMap<String, Value> =
1129            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1130        existing.insert("p:q".to_string(), serde_json::json!("legacy_only"));
1131        std::fs::write(
1132            &legacy_path,
1133            serde_json::to_string_pretty(&existing).unwrap(),
1134        )
1135        .unwrap();
1136        // set_nx must refuse because the legacy entry exists.
1137        assert!(!store
1138            .set_nx("default", "p:q", serde_json::json!("new"))
1139            .unwrap());
1140
1141        // For a fresh dispatched-shaped key with no legacy entry, set_nx
1142        // creates the dispatched file and returns true; second call
1143        // returns false because the dispatched file now exists.
1144        assert!(store
1145            .set_nx("default", "p:r", serde_json::json!("first"))
1146            .unwrap());
1147        assert!(tmp.path().join("p").join("r.json").exists());
1148        assert!(!store
1149            .set_nx("default", "p:r", serde_json::json!("second"))
1150            .unwrap());
1151    }
1152
1153    /// `incr` operates on the dispatched file when the key matches the
1154    /// dispatch pattern; legacy values are migrated forward on the
1155    /// first call.
1156    #[test]
1157    fn dispatch_incr_promotes_legacy_value_on_first_call() {
1158        let (store, tmp) = new_store();
1159        // Pre-populate a legacy numeric value under a dispatched-shaped key.
1160        store.set("default", "seed", serde_json::json!(0)).unwrap();
1161        let legacy_path = tmp.path().join("default.json");
1162        let mut existing: HashMap<String, Value> =
1163            serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1164        existing.insert("counter:cnt".to_string(), serde_json::json!(7));
1165        std::fs::write(
1166            &legacy_path,
1167            serde_json::to_string_pretty(&existing).unwrap(),
1168        )
1169        .unwrap();
1170
1171        // First incr: reads legacy value (7), writes new value (10) to
1172        // the dispatched file.
1173        let result = store.incr("default", "counter:cnt", 3.0, 0.0).unwrap();
1174        assert_eq!(result, 10.0);
1175        let dispatched = tmp.path().join("counter").join("cnt.json");
1176        assert!(dispatched.exists(), "dispatched file must be created");
1177
1178        // Second incr: reads dispatched (10), writes 12.
1179        let result2 = store.incr("default", "counter:cnt", 2.0, 0.0).unwrap();
1180        assert_eq!(result2, 12.0);
1181    }
1182
1183    /// `is_safe_segment` accepts alphanumerics + `_-.` and rejects
1184    /// path traversal sequences and reserved names.
1185    #[test]
1186    fn is_safe_segment_validates_path_safety() {
1187        assert!(is_safe_segment("flow_orch"));
1188        assert!(is_safe_segment("abc-123"));
1189        assert!(is_safe_segment("v1.2.3"));
1190        assert!(!is_safe_segment(""));
1191        assert!(!is_safe_segment("."));
1192        assert!(!is_safe_segment(".."));
1193        assert!(!is_safe_segment("a..b"));
1194        assert!(!is_safe_segment("a/b"));
1195        assert!(!is_safe_segment("a\\b"));
1196        assert!(!is_safe_segment("a b"));
1197        assert!(!is_safe_segment("a\0b"));
1198    }
1199
1200    // ─── Dispatched-layout helpers ─────────────────────────────────────────
1201
1202    mod dispatched_layout {
1203        use super::*;
1204
1205        /// Helper: write a JSON file directly into `{tmp}/{ns}/{key}.json`,
1206        /// creating the parent directory if needed.
1207        fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1208            let dir = tmp.path().join(ns);
1209            // Safe: test-only helper, directory creation cannot fail in practice
1210            fs::create_dir_all(&dir).expect("create ns dir");
1211            let path = dir.join(format!("{key}.json"));
1212            fs::write(
1213                path,
1214                serde_json::to_string_pretty(&value).expect("serialize"),
1215            )
1216            .expect("write seed file");
1217        }
1218
1219        /// `list_dispatched` returns only `.json` files and strips the suffix.
1220        #[test]
1221        fn list_returns_json_keys_only() {
1222            let (store, tmp) = new_store();
1223            seed(&tmp, "myns", "alpha", serde_json::json!(1));
1224            seed(&tmp, "myns", "beta", serde_json::json!(2));
1225            // Place non-.json and sibling files that must be excluded.
1226            let ns_dir = tmp.path().join("myns");
1227            fs::write(ns_dir.join("alpha.json.bak"), b"backup").expect("write bak");
1228            fs::write(ns_dir.join("alpha.json.tmp"), b"tmp").expect("write tmp");
1229            fs::write(ns_dir.join("notes.txt"), b"text").expect("write txt");
1230
1231            let keys = store.list_dispatched("myns").unwrap();
1232            assert_eq!(
1233                keys,
1234                vec!["alpha", "beta"],
1235                "must be sorted, .bak/.tmp/.txt excluded"
1236            );
1237        }
1238
1239        /// `list_dispatched` returns an empty Vec when the namespace directory
1240        /// does not exist (no error).
1241        #[test]
1242        fn list_returns_empty_for_absent_namespace() {
1243            let (store, _tmp) = new_store();
1244            let keys = store.list_dispatched("ghost").unwrap();
1245            assert!(keys.is_empty(), "absent namespace should return empty Vec");
1246        }
1247
1248        /// `list_dispatched` handles a namespace directory that exists but
1249        /// contains only non-`.json` files.
1250        #[test]
1251        fn list_returns_empty_when_only_non_json_files_present() {
1252            let (store, tmp) = new_store();
1253            let ns_dir = tmp.path().join("empty_ns");
1254            // Safe: test setup
1255            fs::create_dir_all(&ns_dir).expect("create dir");
1256            fs::write(ns_dir.join("readme.txt"), b"hi").expect("write");
1257            let keys = store.list_dispatched("empty_ns").unwrap();
1258            assert!(keys.is_empty());
1259        }
1260
1261        /// `show_dispatched` returns `KeyNotFound` when the namespace
1262        /// directory itself does not exist.
1263        #[test]
1264        fn show_returns_key_not_found_for_absent_namespace() {
1265            let (store, _tmp) = new_store();
1266            let err = store.show_dispatched("nodir", "anykey").unwrap_err();
1267            assert!(
1268                matches!(err, StateError::KeyNotFound { .. }),
1269                "expected KeyNotFound, got: {err}"
1270            );
1271            // Confirm the message contains "not found" as specified by the error format.
1272            assert!(err.to_string().contains("not found"), "{err}");
1273        }
1274
1275        /// `show_dispatched` returns `KeyNotFound` when the namespace
1276        /// directory exists but the key file is absent.
1277        #[test]
1278        fn show_returns_key_not_found_for_absent_key() {
1279            let (store, tmp) = new_store();
1280            // Create the namespace directory but not the key file.
1281            let ns_dir = tmp.path().join("myns2");
1282            // Safe: test setup
1283            fs::create_dir_all(&ns_dir).expect("create dir");
1284
1285            let err = store.show_dispatched("myns2", "missing").unwrap_err();
1286            assert!(
1287                matches!(err, StateError::KeyNotFound { .. }),
1288                "expected KeyNotFound, got: {err}"
1289            );
1290        }
1291
1292        /// `show_dispatched` returns the full JSON value when the key exists.
1293        #[test]
1294        fn show_returns_full_value_for_existing_key() {
1295            let (store, tmp) = new_store();
1296            let expected = serde_json::json!({"data": {"completed_steps": ["a", "b"], "x": 42}});
1297            seed(&tmp, "showns", "task1", expected.clone());
1298
1299            let result = store.show_dispatched("showns", "task1").unwrap();
1300            assert_eq!(result, expected);
1301        }
1302    }
1303
1304    mod reset_atomicity {
1305        use super::*;
1306
1307        /// Helper: write a JSON file directly into `{tmp}/{ns}/{key}.json`.
1308        fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1309            let dir = tmp.path().join(ns);
1310            // Safe: test setup
1311            fs::create_dir_all(&dir).expect("create ns dir");
1312            let path = dir.join(format!("{key}.json"));
1313            fs::write(
1314                path,
1315                serde_json::to_string_pretty(&value).expect("serialize"),
1316            )
1317            .expect("write seed");
1318        }
1319
1320        /// Reset removes specified steps and fields; backup file contains the
1321        /// original content; report reflects what was removed.
1322        #[test]
1323        fn reset_removes_steps_and_fields_and_creates_backup() {
1324            let (store, tmp) = new_store();
1325            let original = serde_json::json!({
1326                "data": {
1327                    "completed_steps": ["a", "b", "c"],
1328                    "x": 1,
1329                    "y": "hello"
1330                }
1331            });
1332            seed(&tmp, "testns", "task1", original.clone());
1333
1334            let report = store
1335                .reset_dispatched_with_backup(
1336                    "testns",
1337                    "task1",
1338                    &["b".to_string()],
1339                    &["x".to_string()],
1340                )
1341                .unwrap();
1342
1343            // Backup must exist and contain original content.
1344            let bak_path = tmp.path().join("testns").join("task1.json.bak");
1345            assert!(
1346                bak_path.exists(),
1347                ".bak file must exist at {}",
1348                bak_path.display()
1349            );
1350            assert_eq!(report.backup_path, bak_path);
1351            let bak_content: serde_json::Value =
1352                serde_json::from_str(&fs::read_to_string(&bak_path).expect("read bak"))
1353                    .expect("parse bak");
1354            assert_eq!(bak_content, original, ".bak must contain original content");
1355
1356            // Live file must reflect mutations.
1357            let live_path = tmp.path().join("testns").join("task1.json");
1358            let live_content: serde_json::Value =
1359                serde_json::from_str(&fs::read_to_string(&live_path).expect("read live"))
1360                    .expect("parse live");
1361            let expected = serde_json::json!({
1362                "data": {
1363                    "completed_steps": ["a", "c"],
1364                    "y": "hello"
1365                }
1366            });
1367            assert_eq!(live_content, expected, "live file must be mutated");
1368
1369            // Report counts.
1370            assert_eq!(report.steps_removed, 1, "one step removed");
1371            assert_eq!(report.fields_removed, 1, "one field removed");
1372        }
1373
1374        /// Reset with both steps and fields removed (2-case variant).
1375        #[test]
1376        fn reset_removes_multiple_steps_and_fields() {
1377            let (store, tmp) = new_store();
1378            let original = serde_json::json!({
1379                "data": {
1380                    "completed_steps": ["s1", "s2", "s3", "s4"],
1381                    "repo_readiness": "NOT_READY",
1382                    "repo_readiness_report": "details here",
1383                    "plan_gate_retries": 2
1384                }
1385            });
1386            seed(&tmp, "orchns", "task-abc", original.clone());
1387
1388            let report = store
1389                .reset_dispatched_with_backup(
1390                    "orchns",
1391                    "task-abc",
1392                    &["s2".to_string(), "s3".to_string()],
1393                    &[
1394                        "repo_readiness".to_string(),
1395                        "repo_readiness_report".to_string(),
1396                    ],
1397                )
1398                .unwrap();
1399
1400            let live_path = tmp.path().join("orchns").join("task-abc.json");
1401            let live: serde_json::Value =
1402                serde_json::from_str(&fs::read_to_string(&live_path).expect("read"))
1403                    .expect("parse");
1404            assert_eq!(
1405                live["data"]["completed_steps"],
1406                serde_json::json!(["s1", "s4"])
1407            );
1408            assert!(live["data"].get("repo_readiness").is_none());
1409            assert!(live["data"].get("repo_readiness_report").is_none());
1410            assert_eq!(live["data"]["plan_gate_retries"], 2);
1411
1412            assert_eq!(report.steps_removed, 2);
1413            assert_eq!(report.fields_removed, 2);
1414        }
1415
1416        /// Reset on a missing key returns `KeyNotFound`.
1417        #[test]
1418        fn reset_returns_key_not_found_for_absent_file() {
1419            let (store, _tmp) = new_store();
1420            let err = store
1421                .reset_dispatched_with_backup("ns", "missing", &[], &[])
1422                .unwrap_err();
1423            assert!(
1424                matches!(err, StateError::KeyNotFound { .. }),
1425                "expected KeyNotFound, got: {err}"
1426            );
1427        }
1428
1429        /// Reset returns `ShapeInvalid` when `data` top-level field is absent.
1430        #[test]
1431        fn reset_returns_shape_invalid_when_data_absent() {
1432            let (store, tmp) = new_store();
1433            // File has no "data" key.
1434            let bad = serde_json::json!({"identity": {"task_id": "t1"}});
1435            let dir = tmp.path().join("badns");
1436            // Safe: test setup
1437            fs::create_dir_all(&dir).expect("create dir");
1438            fs::write(
1439                dir.join("k.json"),
1440                serde_json::to_string_pretty(&bad).expect("ser"),
1441            )
1442            .expect("write");
1443
1444            let err = store
1445                .reset_dispatched_with_backup("badns", "k", &["s".to_string()], &[])
1446                .unwrap_err();
1447            assert!(
1448                matches!(err, StateError::ShapeInvalid { .. }),
1449                "expected ShapeInvalid, got: {err}"
1450            );
1451            assert!(err.to_string().contains("data"), "{err}");
1452        }
1453
1454        /// Reset returns `ShapeInvalid` when `data.completed_steps` is not
1455        /// an array.
1456        #[test]
1457        fn reset_returns_shape_invalid_when_completed_steps_not_array() {
1458            let (store, tmp) = new_store();
1459            // completed_steps is an object, not an array.
1460            let bad = serde_json::json!({"data": {"completed_steps": {"step": "a"}}});
1461            let dir = tmp.path().join("badns2");
1462            // Safe: test setup
1463            fs::create_dir_all(&dir).expect("create dir");
1464            fs::write(
1465                dir.join("k.json"),
1466                serde_json::to_string_pretty(&bad).expect("ser"),
1467            )
1468            .expect("write");
1469
1470            let err = store
1471                .reset_dispatched_with_backup("badns2", "k", &["a".to_string()], &[])
1472                .unwrap_err();
1473            assert!(
1474                matches!(err, StateError::ShapeInvalid { .. }),
1475                "expected ShapeInvalid, got: {err}"
1476            );
1477            assert!(
1478                err.to_string().contains("completed_steps"),
1479                "message should mention completed_steps: {err}"
1480            );
1481        }
1482    }
1483
1484    mod path_traversal {
1485        use super::*;
1486
1487        /// `list_dispatched` rejects unsafe namespace segments.
1488        #[test]
1489        fn list_rejects_unsafe_namespace() {
1490            let (store, _tmp) = new_store();
1491            let err = store.list_dispatched("../evil").unwrap_err();
1492            assert!(
1493                matches!(
1494                    err,
1495                    StateError::UnsafeSegment {
1496                        which: "namespace",
1497                        ..
1498                    }
1499                ),
1500                "expected UnsafeSegment{{namespace}}, got: {err}"
1501            );
1502        }
1503
1504        /// `show_dispatched` rejects an unsafe key segment.
1505        #[test]
1506        fn show_rejects_unsafe_key() {
1507            let (store, _tmp) = new_store();
1508            let err = store.show_dispatched("ns", "foo/bar").unwrap_err();
1509            assert!(
1510                matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1511                "expected UnsafeSegment{{key}}, got: {err}"
1512            );
1513        }
1514
1515        /// `reset_dispatched_with_backup` rejects an empty namespace segment.
1516        #[test]
1517        fn reset_rejects_empty_namespace() {
1518            let (store, _tmp) = new_store();
1519            let err = store
1520                .reset_dispatched_with_backup("", "key", &[], &[])
1521                .unwrap_err();
1522            assert!(
1523                matches!(
1524                    err,
1525                    StateError::UnsafeSegment {
1526                        which: "namespace",
1527                        ..
1528                    }
1529                ),
1530                "expected UnsafeSegment{{namespace}}, got: {err}"
1531            );
1532        }
1533
1534        /// `reset_dispatched_with_backup` rejects a `..` key segment.
1535        #[test]
1536        fn reset_rejects_dotdot_key() {
1537            let (store, _tmp) = new_store();
1538            let err = store
1539                .reset_dispatched_with_backup("ns", "..", &[], &[])
1540                .unwrap_err();
1541            assert!(
1542                matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1543                "expected UnsafeSegment{{key}}, got: {err}"
1544            );
1545        }
1546    }
1547}
1548
1549#[cfg(test)]
1550mod proptests {
1551    use super::*;
1552    use proptest::prelude::*;
1553
1554    fn new_store() -> (JsonFileStore, tempfile::TempDir) {
1555        let tmp = tempfile::tempdir().unwrap();
1556        let store = JsonFileStore::new(tmp.path().to_path_buf());
1557        (store, tmp)
1558    }
1559
1560    proptest! {
1561        /// Any valid namespace (alphanumeric + hyphen/underscore) round-trips through set/get.
1562        #[test]
1563        fn roundtrip_arbitrary_values(
1564            key in "[a-z]{1,20}",
1565            val in any::<i64>(),
1566        ) {
1567            let (store, _tmp) = new_store();
1568            let ns = "rt";
1569            let json_val = serde_json::json!(val);
1570            store.set(ns, &key, json_val.clone()).unwrap();
1571            let got = store.get(ns, &key).unwrap();
1572            prop_assert_eq!(got, Some(json_val));
1573            let _ = store.delete(ns, &key);
1574        }
1575
1576        /// Path traversal patterns are always rejected.
1577        #[test]
1578        fn traversal_always_rejected(
1579            prefix in "[a-z]{0,5}",
1580            suffix in "[a-z]{0,5}",
1581        ) {
1582            let (store, _tmp) = new_store();
1583            let evil = format!("{prefix}/../{suffix}");
1584            prop_assert!(store.state_path(&evil).is_err());
1585        }
1586
1587        /// state_path rejects NUL bytes anywhere in the namespace.
1588        #[test]
1589        fn nul_byte_always_rejected(
1590            prefix in "[a-z]{0,10}",
1591            suffix in "[a-z]{0,10}",
1592        ) {
1593            let (store, _tmp) = new_store();
1594            let evil = format!("{prefix}\0{suffix}");
1595            prop_assert!(store.state_path(&evil).is_err());
1596        }
1597    }
1598}