use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use serde_json::Value;
#[derive(thiserror::Error, Debug)]
pub enum StateError {
#[error("state: key '{key}' not found in namespace '{namespace}'")]
KeyNotFound { namespace: String, key: String },
#[error("state: unsafe {which} segment '{value}'")]
UnsafeSegment { which: &'static str, value: String },
#[error("state: backup I/O failed: {0}")]
IoBackup(#[source] std::io::Error),
#[error("state: read failed: {0}")]
IoRead(#[source] std::io::Error),
#[error("state: write failed: {0}")]
IoWrite(#[source] std::io::Error),
#[error("state: serialize/parse failed: {0}")]
Serde(#[from] serde_json::Error),
#[error("state: shape invalid: {reason}")]
ShapeInvalid { reason: String },
}
#[derive(Debug, Clone)]
pub struct ResetReport {
pub backup_path: PathBuf,
pub steps_removed: usize,
pub fields_removed: usize,
}
fn is_safe_segment(s: &str) -> bool {
if s.is_empty() || s == "." || s == ".." {
return false;
}
if s.contains("..") {
return false;
}
s.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-' || b == b'.')
}
pub trait StateStore: Send + Sync {
fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
}
pub struct JsonFileStore {
root: PathBuf,
locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
}
impl JsonFileStore {
pub fn new(root: PathBuf) -> Self {
Self {
root,
locks: Mutex::new(HashMap::new()),
}
}
fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
let mut map = self
.locks
.lock()
.map_err(|_| "state: locks map poisoned".to_string())?;
Ok(Arc::clone(
map.entry(path.to_path_buf())
.or_insert_with(|| Arc::new(Mutex::new(()))),
))
}
pub fn root(&self) -> &Path {
&self.root
}
fn ensure_root(&self) -> Result<&Path, String> {
if !self.root.exists() {
fs::create_dir_all(&self.root)
.map_err(|e| format!("Failed to create state dir: {e}"))?;
}
Ok(&self.root)
}
pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
if ns.contains('/')
|| ns.contains('\\')
|| ns.contains("..")
|| ns.contains('\0')
|| ns.is_empty()
{
return Err(format!("Invalid namespace: '{ns}'"));
}
let dir = self.ensure_root()?;
Ok(dir.join(format!("{ns}.json")))
}
fn dispatch_path(&self, key: &str) -> Result<Option<PathBuf>, String> {
let (prefix, id) = match key.split_once(':') {
Some(pair) => pair,
None => return Ok(None),
};
if !is_safe_segment(prefix) || !is_safe_segment(id) {
return Ok(None);
}
let dir = self.ensure_root()?;
Ok(Some(dir.join(prefix).join(format!("{id}.json"))))
}
fn load_dispatched(&self, path: &Path) -> Result<Option<Value>, String> {
if !path.exists() {
return Ok(None);
}
let content = fs::read_to_string(path)
.map_err(|e| format!("Failed to read dispatched state '{}': {e}", path.display()))?;
let v: Value = serde_json::from_str(&content)
.map_err(|e| format!("Failed to parse dispatched state '{}': {e}", path.display()))?;
Ok(Some(v))
}
fn save_dispatched(&self, path: &Path, value: &Value) -> Result<(), String> {
if let Some(parent) = path.parent() {
if !parent.exists() {
fs::create_dir_all(parent).map_err(|e| {
format!(
"Failed to create dispatched state dir '{}': {e}",
parent.display()
)
})?;
}
}
let tmp = path.with_extension("json.tmp");
let content = serde_json::to_string_pretty(value)
.map_err(|e| format!("Failed to serialize dispatched state: {e}"))?;
fs::write(&tmp, &content)
.map_err(|e| format!("Failed to write dispatched state tmp: {e}"))?;
fs::rename(&tmp, path)
.map_err(|e| format!("Failed to rename dispatched state file: {e}"))?;
Ok(())
}
pub fn list_dispatched(&self, namespace: &str) -> Result<Vec<String>, StateError> {
if !is_safe_segment(namespace) {
return Err(StateError::UnsafeSegment {
which: "namespace",
value: namespace.to_string(),
});
}
let ns_dir = self.root.join(namespace);
if !ns_dir.exists() {
return Ok(Vec::new());
}
let mut keys = Vec::new();
let entries = fs::read_dir(&ns_dir).map_err(StateError::IoRead)?;
for entry in entries {
let entry = entry.map_err(StateError::IoRead)?;
let fname = entry.file_name();
let fname_str = fname.to_string_lossy();
if !fname_str.ends_with(".json")
|| fname_str.ends_with(".json.bak")
|| fname_str.ends_with(".json.tmp")
{
continue;
}
let key = fname_str
.strip_suffix(".json")
.unwrap_or(&fname_str)
.to_string();
keys.push(key);
}
keys.sort();
Ok(keys)
}
pub fn show_dispatched(
&self,
namespace: &str,
key: &str,
) -> Result<serde_json::Value, StateError> {
if !is_safe_segment(namespace) {
return Err(StateError::UnsafeSegment {
which: "namespace",
value: namespace.to_string(),
});
}
if !is_safe_segment(key) {
return Err(StateError::UnsafeSegment {
which: "key",
value: key.to_string(),
});
}
let target = self.root.join(namespace).join(format!("{key}.json"));
if !target.exists() {
return Err(StateError::KeyNotFound {
namespace: namespace.to_string(),
key: key.to_string(),
});
}
let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
let value: serde_json::Value = serde_json::from_str(&content)?;
Ok(value)
}
pub fn reset_dispatched_with_backup(
&self,
namespace: &str,
key: &str,
steps: &[String],
fields: &[String],
) -> Result<ResetReport, StateError> {
if !is_safe_segment(namespace) {
return Err(StateError::UnsafeSegment {
which: "namespace",
value: namespace.to_string(),
});
}
if !is_safe_segment(key) {
return Err(StateError::UnsafeSegment {
which: "key",
value: key.to_string(),
});
}
let target = self.root.join(namespace).join(format!("{key}.json"));
if !target.exists() {
return Err(StateError::KeyNotFound {
namespace: namespace.to_string(),
key: key.to_string(),
});
}
let lock = self
.ns_lock(&target)
.map_err(|s| StateError::ShapeInvalid { reason: s })?;
let _guard = lock.lock().map_err(|_| StateError::ShapeInvalid {
reason: "lock poisoned".to_string(),
})?;
let bak_path = target.with_extension("json.bak");
fs::copy(&target, &bak_path).map_err(StateError::IoBackup)?;
let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
let mut value: serde_json::Value = serde_json::from_str(&content)?;
let data = value
.get_mut("data")
.ok_or_else(|| StateError::ShapeInvalid {
reason: "missing 'data' top-level field".to_string(),
})?;
let data_obj = data
.as_object_mut()
.ok_or_else(|| StateError::ShapeInvalid {
reason: "'data' top-level field must be an object".to_string(),
})?;
let mut steps_removed = 0usize;
if !steps.is_empty() {
if let Some(cs) = data_obj.get_mut("completed_steps") {
if let Some(arr) = cs.as_array_mut() {
let before = arr.len();
arr.retain(|v| {
if let Some(s) = v.as_str() {
!steps.iter().any(|step| step == s)
} else {
true
}
});
steps_removed = before - arr.len();
} else {
return Err(StateError::ShapeInvalid {
reason: "data.completed_steps must be an array".to_string(),
});
}
}
}
let mut fields_removed = 0usize;
for field in fields {
if data_obj.remove(field.as_str()).is_some() {
fields_removed += 1;
}
}
let tmp = target.with_extension("json.tmp");
let serialized = serde_json::to_string_pretty(&value)?;
fs::write(&tmp, &serialized).map_err(StateError::IoWrite)?;
fs::rename(&tmp, &target).map_err(StateError::IoWrite)?;
Ok(ResetReport {
backup_path: bak_path,
steps_removed,
fields_removed,
})
}
fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
let path = self.state_path(ns)?;
if !path.exists() {
return Ok(HashMap::new());
}
let content =
fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
}
fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
let path = self.state_path(ns)?;
let tmp = path.with_extension("json.tmp");
let content = serde_json::to_string_pretty(data)
.map_err(|e| format!("Failed to serialize state: {e}"))?;
fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
Ok(())
}
}
impl StateStore for JsonFileStore {
fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
if let Some(dpath) = self.dispatch_path(key)? {
let lock = self.ns_lock(&dpath)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
if let Some(v) = self.load_dispatched(&dpath)? {
return Ok(Some(v));
}
}
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let state = self.load(ns)?;
Ok(state.get(key).cloned())
}
fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
if let Some(dpath) = self.dispatch_path(key)? {
let lock = self.ns_lock(&dpath)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
return self.save_dispatched(&dpath, &value);
}
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let mut state = self.load(ns)?;
state.insert(key.to_string(), value);
self.save(ns, &state)
}
fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
if let Some(dpath) = self.dispatch_path(key)? {
let lock = self.ns_lock(&dpath)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
if dpath.exists() {
fs::remove_file(&dpath).map_err(|e| {
format!(
"Failed to delete dispatched state '{}': {e}",
dpath.display()
)
})?;
return Ok(true);
}
}
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let mut state = self.load(ns)?;
let existed = state.remove(key).is_some();
if existed {
self.save(ns, &state)?;
}
Ok(existed)
}
fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let state = self.load(ns)?;
Ok(state.keys().cloned().collect())
}
fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
if let Some(dpath) = self.dispatch_path(key)? {
let lock = self.ns_lock(&dpath)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
if dpath.exists() {
return Ok(true);
}
}
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let state = self.load(ns)?;
Ok(state.contains_key(key))
}
fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
if let Some(dpath) = self.dispatch_path(key)? {
let lock = self.ns_lock(&dpath)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
if dpath.exists() {
return Ok(false);
}
let path = self.state_path(ns)?;
if path.exists() {
let state = self.load(ns)?;
if state.contains_key(key) {
return Ok(false);
}
}
self.save_dispatched(&dpath, &value)?;
return Ok(true);
}
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let mut state = self.load(ns)?;
if state.contains_key(key) {
return Ok(false);
}
state.insert(key.to_string(), value);
self.save(ns, &state)?;
Ok(true)
}
fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
if let Some(dpath) = self.dispatch_path(key)? {
let lock = self.ns_lock(&dpath)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
let current = if let Some(v) = self.load_dispatched(&dpath)? {
v.as_f64()
.ok_or_else(|| format!("incr: value at '{key}' is not a number"))?
} else {
let path = self.state_path(ns)?;
if path.exists() {
let state = self.load(ns)?;
match state.get(key) {
Some(v) => v
.as_f64()
.ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
None => default,
}
} else {
default
}
};
let new_val = current + delta;
self.save_dispatched(&dpath, &serde_json::json!(new_val))?;
return Ok(new_val);
}
let path = self.state_path(ns)?;
let lock = self.ns_lock(&path)?;
let _guard = lock
.lock()
.map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
let mut state = self.load(ns)?;
let current = match state.get(key) {
Some(v) => v
.as_f64()
.ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
None => default,
};
let new_val = current + delta;
state.insert(key.to_string(), serde_json::json!(new_val));
self.save(ns, &state)?;
Ok(new_val)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn new_store() -> (JsonFileStore, TempDir) {
let tmp = tempfile::tempdir().unwrap();
let store = JsonFileStore::new(tmp.path().to_path_buf());
(store, tmp)
}
#[test]
fn roundtrip() {
let (store, _tmp) = new_store();
let ns = "rt";
store.set(ns, "count", serde_json::json!(42)).unwrap();
store
.set(ns, "name", serde_json::json!("algocline"))
.unwrap();
assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
assert_eq!(
store.get(ns, "name").unwrap(),
Some(serde_json::json!("algocline"))
);
assert_eq!(store.get(ns, "missing").unwrap(), None);
let k = store.keys(ns).unwrap();
assert!(k.contains(&"count".to_string()));
assert!(k.contains(&"name".to_string()));
assert!(store.delete(ns, "count").unwrap());
assert!(!store.delete(ns, "count").unwrap());
assert_eq!(store.get(ns, "count").unwrap(), None);
}
#[test]
fn invalid_namespace() {
let (store, _tmp) = new_store();
assert!(store.state_path("../evil").is_err());
assert!(store.state_path("foo/bar").is_err());
assert!(store.state_path("foo\\bar").is_err());
assert!(store.state_path("").is_err());
assert!(store.state_path("foo\0bar").is_err());
}
#[test]
fn get_nonexistent_namespace_returns_empty() {
let (store, _tmp) = new_store();
let result = store.get("ghost_ns", "any_key").unwrap();
assert_eq!(result, None);
}
#[test]
fn keys_nonexistent_namespace_returns_empty() {
let (store, _tmp) = new_store();
let result = store.keys("ghost_ns").unwrap();
assert!(result.is_empty());
}
#[test]
fn delete_nonexistent_key_returns_false() {
let (store, _tmp) = new_store();
assert!(!store.delete("delns", "nope").unwrap());
}
#[test]
fn set_overwrites_existing_value() {
let (store, _tmp) = new_store();
let ns = "ow";
store.set(ns, "k", serde_json::json!(1)).unwrap();
store.set(ns, "k", serde_json::json!(2)).unwrap();
assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
}
#[test]
fn state_path_valid_namespaces() {
let (store, _tmp) = new_store();
assert!(store.state_path("default").is_ok());
assert!(store.state_path("my-app").is_ok());
assert!(store.state_path("test_123").is_ok());
}
#[test]
fn has_returns_existence() {
let (store, _tmp) = new_store();
let ns = "hasns";
assert!(!store.has(ns, "x").unwrap());
store.set(ns, "x", serde_json::json!(1)).unwrap();
assert!(store.has(ns, "x").unwrap());
}
#[test]
fn set_nx_only_sets_if_absent() {
let (store, _tmp) = new_store();
let ns = "snx";
assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
assert_eq!(
store.get(ns, "k").unwrap(),
Some(serde_json::json!("first")),
"set_nx should not overwrite"
);
}
#[test]
fn incr_initialises_and_increments() {
let (store, _tmp) = new_store();
let ns = "inc";
let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
assert!((v - 1.0).abs() < f64::EPSILON);
let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
assert!((v - 6.0).abs() < f64::EPSILON);
let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
assert!((v - 4.0).abs() < f64::EPSILON);
}
#[test]
fn incr_rejects_non_numeric() {
let (store, _tmp) = new_store();
let ns = "incerr";
store.set(ns, "s", serde_json::json!("hello")).unwrap();
let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
assert!(err.contains("not a number"), "got: {err}");
}
#[test]
fn incr_custom_default() {
let (store, _tmp) = new_store();
let ns = "incdef";
let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
}
#[test]
fn dispatch_writes_to_per_key_file_for_prefix_id_keys() {
let (store, tmp) = new_store();
store
.set(
"default",
"flow_orch:abc-123",
serde_json::json!({"step": 1}),
)
.unwrap();
let dispatched = tmp.path().join("flow_orch").join("abc-123.json");
assert!(
dispatched.exists(),
"dispatched file must exist at {}",
dispatched.display()
);
let legacy = tmp.path().join("default.json");
assert!(
!legacy.exists(),
"legacy default.json must not be created for dispatched keys"
);
}
#[test]
fn dispatch_read_falls_back_to_legacy_for_unmigrated_entries() {
let (store, tmp) = new_store();
store
.set("default", "boot_marker", serde_json::json!(true))
.unwrap();
let legacy_path = tmp.path().join("default.json");
let mut existing: HashMap<String, Value> =
serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
existing.insert(
"flow_legacy:xyz".to_string(),
serde_json::json!({"old": "value"}),
);
std::fs::write(
&legacy_path,
serde_json::to_string_pretty(&existing).unwrap(),
)
.unwrap();
assert_eq!(
store.get("default", "flow_legacy:xyz").unwrap(),
Some(serde_json::json!({"old": "value"})),
"must fall back to legacy default.json when dispatched file absent"
);
store
.set(
"default",
"flow_legacy:xyz",
serde_json::json!({"new": "promoted"}),
)
.unwrap();
assert!(
tmp.path().join("flow_legacy").join("xyz.json").exists(),
"set() must promote dispatched-shaped keys to per-key file"
);
assert_eq!(
store.get("default", "flow_legacy:xyz").unwrap(),
Some(serde_json::json!({"new": "promoted"})),
"dispatched file must shadow legacy entry on subsequent reads"
);
}
#[test]
fn dispatch_skips_keys_without_colon_or_with_unsafe_segments() {
let (store, tmp) = new_store();
store
.set("default", "no_colon", serde_json::json!(1))
.unwrap();
store
.set("default", "bad/prefix:id", serde_json::json!(2))
.unwrap();
store
.set("default", "prefix:bad/id", serde_json::json!(3))
.unwrap();
store
.set("default", "prefix:..", serde_json::json!(4))
.unwrap();
let legacy = tmp.path().join("default.json");
let raw: HashMap<String, Value> =
serde_json::from_str(&std::fs::read_to_string(&legacy).unwrap()).unwrap();
assert_eq!(raw.get("no_colon"), Some(&serde_json::json!(1)));
assert_eq!(raw.get("bad/prefix:id"), Some(&serde_json::json!(2)));
assert_eq!(raw.get("prefix:bad/id"), Some(&serde_json::json!(3)));
assert_eq!(raw.get("prefix:.."), Some(&serde_json::json!(4)));
assert!(!tmp.path().join("bad").exists());
assert!(!tmp.path().join("prefix").exists());
}
#[test]
fn dispatch_delete_removes_per_key_file() {
let (store, tmp) = new_store();
store.set("default", "p:q", serde_json::json!("v")).unwrap();
let dispatched = tmp.path().join("p").join("q.json");
assert!(
dispatched.exists(),
"dispatched file should exist before delete"
);
assert!(store.delete("default", "p:q").unwrap());
assert!(
!dispatched.exists(),
"dispatched file should be removed after delete"
);
assert!(!store.delete("default", "p:q").unwrap());
}
#[test]
fn dispatch_has_reports_dispatched_file_existence() {
let (store, _tmp) = new_store();
assert!(!store.has("default", "p:q").unwrap());
store.set("default", "p:q", serde_json::json!("v")).unwrap();
assert!(store.has("default", "p:q").unwrap());
}
#[test]
fn dispatch_set_nx_blocks_when_legacy_or_dispatched_entry_exists() {
let (store, tmp) = new_store();
store
.set("default", "boot", serde_json::json!(true))
.unwrap();
let legacy_path = tmp.path().join("default.json");
let mut existing: HashMap<String, Value> =
serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
existing.insert("p:q".to_string(), serde_json::json!("legacy_only"));
std::fs::write(
&legacy_path,
serde_json::to_string_pretty(&existing).unwrap(),
)
.unwrap();
assert!(!store
.set_nx("default", "p:q", serde_json::json!("new"))
.unwrap());
assert!(store
.set_nx("default", "p:r", serde_json::json!("first"))
.unwrap());
assert!(tmp.path().join("p").join("r.json").exists());
assert!(!store
.set_nx("default", "p:r", serde_json::json!("second"))
.unwrap());
}
#[test]
fn dispatch_incr_promotes_legacy_value_on_first_call() {
let (store, tmp) = new_store();
store.set("default", "seed", serde_json::json!(0)).unwrap();
let legacy_path = tmp.path().join("default.json");
let mut existing: HashMap<String, Value> =
serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
existing.insert("counter:cnt".to_string(), serde_json::json!(7));
std::fs::write(
&legacy_path,
serde_json::to_string_pretty(&existing).unwrap(),
)
.unwrap();
let result = store.incr("default", "counter:cnt", 3.0, 0.0).unwrap();
assert_eq!(result, 10.0);
let dispatched = tmp.path().join("counter").join("cnt.json");
assert!(dispatched.exists(), "dispatched file must be created");
let result2 = store.incr("default", "counter:cnt", 2.0, 0.0).unwrap();
assert_eq!(result2, 12.0);
}
#[test]
fn is_safe_segment_validates_path_safety() {
assert!(is_safe_segment("flow_orch"));
assert!(is_safe_segment("abc-123"));
assert!(is_safe_segment("v1.2.3"));
assert!(!is_safe_segment(""));
assert!(!is_safe_segment("."));
assert!(!is_safe_segment(".."));
assert!(!is_safe_segment("a..b"));
assert!(!is_safe_segment("a/b"));
assert!(!is_safe_segment("a\\b"));
assert!(!is_safe_segment("a b"));
assert!(!is_safe_segment("a\0b"));
}
mod dispatched_layout {
use super::*;
fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
let dir = tmp.path().join(ns);
fs::create_dir_all(&dir).expect("create ns dir");
let path = dir.join(format!("{key}.json"));
fs::write(
path,
serde_json::to_string_pretty(&value).expect("serialize"),
)
.expect("write seed file");
}
#[test]
fn list_returns_json_keys_only() {
let (store, tmp) = new_store();
seed(&tmp, "myns", "alpha", serde_json::json!(1));
seed(&tmp, "myns", "beta", serde_json::json!(2));
let ns_dir = tmp.path().join("myns");
fs::write(ns_dir.join("alpha.json.bak"), b"backup").expect("write bak");
fs::write(ns_dir.join("alpha.json.tmp"), b"tmp").expect("write tmp");
fs::write(ns_dir.join("notes.txt"), b"text").expect("write txt");
let keys = store.list_dispatched("myns").unwrap();
assert_eq!(
keys,
vec!["alpha", "beta"],
"must be sorted, .bak/.tmp/.txt excluded"
);
}
#[test]
fn list_returns_empty_for_absent_namespace() {
let (store, _tmp) = new_store();
let keys = store.list_dispatched("ghost").unwrap();
assert!(keys.is_empty(), "absent namespace should return empty Vec");
}
#[test]
fn list_returns_empty_when_only_non_json_files_present() {
let (store, tmp) = new_store();
let ns_dir = tmp.path().join("empty_ns");
fs::create_dir_all(&ns_dir).expect("create dir");
fs::write(ns_dir.join("readme.txt"), b"hi").expect("write");
let keys = store.list_dispatched("empty_ns").unwrap();
assert!(keys.is_empty());
}
#[test]
fn show_returns_key_not_found_for_absent_namespace() {
let (store, _tmp) = new_store();
let err = store.show_dispatched("nodir", "anykey").unwrap_err();
assert!(
matches!(err, StateError::KeyNotFound { .. }),
"expected KeyNotFound, got: {err}"
);
assert!(err.to_string().contains("not found"), "{err}");
}
#[test]
fn show_returns_key_not_found_for_absent_key() {
let (store, tmp) = new_store();
let ns_dir = tmp.path().join("myns2");
fs::create_dir_all(&ns_dir).expect("create dir");
let err = store.show_dispatched("myns2", "missing").unwrap_err();
assert!(
matches!(err, StateError::KeyNotFound { .. }),
"expected KeyNotFound, got: {err}"
);
}
#[test]
fn show_returns_full_value_for_existing_key() {
let (store, tmp) = new_store();
let expected = serde_json::json!({"data": {"completed_steps": ["a", "b"], "x": 42}});
seed(&tmp, "showns", "task1", expected.clone());
let result = store.show_dispatched("showns", "task1").unwrap();
assert_eq!(result, expected);
}
}
mod reset_atomicity {
use super::*;
fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
let dir = tmp.path().join(ns);
fs::create_dir_all(&dir).expect("create ns dir");
let path = dir.join(format!("{key}.json"));
fs::write(
path,
serde_json::to_string_pretty(&value).expect("serialize"),
)
.expect("write seed");
}
#[test]
fn reset_removes_steps_and_fields_and_creates_backup() {
let (store, tmp) = new_store();
let original = serde_json::json!({
"data": {
"completed_steps": ["a", "b", "c"],
"x": 1,
"y": "hello"
}
});
seed(&tmp, "testns", "task1", original.clone());
let report = store
.reset_dispatched_with_backup(
"testns",
"task1",
&["b".to_string()],
&["x".to_string()],
)
.unwrap();
let bak_path = tmp.path().join("testns").join("task1.json.bak");
assert!(
bak_path.exists(),
".bak file must exist at {}",
bak_path.display()
);
assert_eq!(report.backup_path, bak_path);
let bak_content: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&bak_path).expect("read bak"))
.expect("parse bak");
assert_eq!(bak_content, original, ".bak must contain original content");
let live_path = tmp.path().join("testns").join("task1.json");
let live_content: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&live_path).expect("read live"))
.expect("parse live");
let expected = serde_json::json!({
"data": {
"completed_steps": ["a", "c"],
"y": "hello"
}
});
assert_eq!(live_content, expected, "live file must be mutated");
assert_eq!(report.steps_removed, 1, "one step removed");
assert_eq!(report.fields_removed, 1, "one field removed");
}
#[test]
fn reset_removes_multiple_steps_and_fields() {
let (store, tmp) = new_store();
let original = serde_json::json!({
"data": {
"completed_steps": ["s1", "s2", "s3", "s4"],
"repo_readiness": "NOT_READY",
"repo_readiness_report": "details here",
"plan_gate_retries": 2
}
});
seed(&tmp, "orchns", "task-abc", original.clone());
let report = store
.reset_dispatched_with_backup(
"orchns",
"task-abc",
&["s2".to_string(), "s3".to_string()],
&[
"repo_readiness".to_string(),
"repo_readiness_report".to_string(),
],
)
.unwrap();
let live_path = tmp.path().join("orchns").join("task-abc.json");
let live: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&live_path).expect("read"))
.expect("parse");
assert_eq!(
live["data"]["completed_steps"],
serde_json::json!(["s1", "s4"])
);
assert!(live["data"].get("repo_readiness").is_none());
assert!(live["data"].get("repo_readiness_report").is_none());
assert_eq!(live["data"]["plan_gate_retries"], 2);
assert_eq!(report.steps_removed, 2);
assert_eq!(report.fields_removed, 2);
}
#[test]
fn reset_returns_key_not_found_for_absent_file() {
let (store, _tmp) = new_store();
let err = store
.reset_dispatched_with_backup("ns", "missing", &[], &[])
.unwrap_err();
assert!(
matches!(err, StateError::KeyNotFound { .. }),
"expected KeyNotFound, got: {err}"
);
}
#[test]
fn reset_returns_shape_invalid_when_data_absent() {
let (store, tmp) = new_store();
let bad = serde_json::json!({"identity": {"task_id": "t1"}});
let dir = tmp.path().join("badns");
fs::create_dir_all(&dir).expect("create dir");
fs::write(
dir.join("k.json"),
serde_json::to_string_pretty(&bad).expect("ser"),
)
.expect("write");
let err = store
.reset_dispatched_with_backup("badns", "k", &["s".to_string()], &[])
.unwrap_err();
assert!(
matches!(err, StateError::ShapeInvalid { .. }),
"expected ShapeInvalid, got: {err}"
);
assert!(err.to_string().contains("data"), "{err}");
}
#[test]
fn reset_returns_shape_invalid_when_completed_steps_not_array() {
let (store, tmp) = new_store();
let bad = serde_json::json!({"data": {"completed_steps": {"step": "a"}}});
let dir = tmp.path().join("badns2");
fs::create_dir_all(&dir).expect("create dir");
fs::write(
dir.join("k.json"),
serde_json::to_string_pretty(&bad).expect("ser"),
)
.expect("write");
let err = store
.reset_dispatched_with_backup("badns2", "k", &["a".to_string()], &[])
.unwrap_err();
assert!(
matches!(err, StateError::ShapeInvalid { .. }),
"expected ShapeInvalid, got: {err}"
);
assert!(
err.to_string().contains("completed_steps"),
"message should mention completed_steps: {err}"
);
}
}
mod path_traversal {
use super::*;
#[test]
fn list_rejects_unsafe_namespace() {
let (store, _tmp) = new_store();
let err = store.list_dispatched("../evil").unwrap_err();
assert!(
matches!(
err,
StateError::UnsafeSegment {
which: "namespace",
..
}
),
"expected UnsafeSegment{{namespace}}, got: {err}"
);
}
#[test]
fn show_rejects_unsafe_key() {
let (store, _tmp) = new_store();
let err = store.show_dispatched("ns", "foo/bar").unwrap_err();
assert!(
matches!(err, StateError::UnsafeSegment { which: "key", .. }),
"expected UnsafeSegment{{key}}, got: {err}"
);
}
#[test]
fn reset_rejects_empty_namespace() {
let (store, _tmp) = new_store();
let err = store
.reset_dispatched_with_backup("", "key", &[], &[])
.unwrap_err();
assert!(
matches!(
err,
StateError::UnsafeSegment {
which: "namespace",
..
}
),
"expected UnsafeSegment{{namespace}}, got: {err}"
);
}
#[test]
fn reset_rejects_dotdot_key() {
let (store, _tmp) = new_store();
let err = store
.reset_dispatched_with_backup("ns", "..", &[], &[])
.unwrap_err();
assert!(
matches!(err, StateError::UnsafeSegment { which: "key", .. }),
"expected UnsafeSegment{{key}}, got: {err}"
);
}
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
fn new_store() -> (JsonFileStore, tempfile::TempDir) {
let tmp = tempfile::tempdir().unwrap();
let store = JsonFileStore::new(tmp.path().to_path_buf());
(store, tmp)
}
proptest! {
#[test]
fn roundtrip_arbitrary_values(
key in "[a-z]{1,20}",
val in any::<i64>(),
) {
let (store, _tmp) = new_store();
let ns = "rt";
let json_val = serde_json::json!(val);
store.set(ns, &key, json_val.clone()).unwrap();
let got = store.get(ns, &key).unwrap();
prop_assert_eq!(got, Some(json_val));
let _ = store.delete(ns, &key);
}
#[test]
fn traversal_always_rejected(
prefix in "[a-z]{0,5}",
suffix in "[a-z]{0,5}",
) {
let (store, _tmp) = new_store();
let evil = format!("{prefix}/../{suffix}");
prop_assert!(store.state_path(&evil).is_err());
}
#[test]
fn nul_byte_always_rejected(
prefix in "[a-z]{0,10}",
suffix in "[a-z]{0,10}",
) {
let (store, _tmp) = new_store();
let evil = format!("{prefix}\0{suffix}");
prop_assert!(store.state_path(&evil).is_err());
}
}
}