1use std::collections::HashMap;
49use std::fs;
50use std::path::{Path, PathBuf};
51use std::sync::{Arc, Mutex};
52
53use serde_json::Value;
54
55#[derive(thiserror::Error, Debug)]
67pub enum StateError {
68 #[error("state: key '{key}' not found in namespace '{namespace}'")]
74 KeyNotFound { namespace: String, key: String },
75
76 #[error("state: unsafe {which} segment '{value}'")]
82 UnsafeSegment { which: &'static str, value: String },
83
84 #[error("state: backup I/O failed: {0}")]
90 IoBackup(#[source] std::io::Error),
91
92 #[error("state: read failed: {0}")]
97 IoRead(#[source] std::io::Error),
98
99 #[error("state: write failed: {0}")]
105 IoWrite(#[source] std::io::Error),
106
107 #[error("state: serialize/parse failed: {0}")]
111 Serde(#[from] serde_json::Error),
112
113 #[error("state: shape invalid: {reason}")]
120 ShapeInvalid { reason: String },
121}
122
123#[derive(Debug, Clone)]
130pub struct ResetReport {
131 pub backup_path: PathBuf,
133 pub steps_removed: usize,
135 pub fields_removed: usize,
137}
138
139fn is_safe_segment(s: &str) -> bool {
147 if s.is_empty() || s == "." || s == ".." {
148 return false;
149 }
150 if s.contains("..") {
151 return false;
152 }
153 s.bytes()
154 .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-' || b == b'.')
155}
156
157pub trait StateStore: Send + Sync {
166 fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
168
169 fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
171
172 fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
174
175 fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
177
178 fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
184
185 fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
195
196 fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
212}
213
214pub struct JsonFileStore {
244 root: PathBuf,
245 locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
248}
249
250impl JsonFileStore {
251 pub fn new(root: PathBuf) -> Self {
256 Self {
257 root,
258 locks: Mutex::new(HashMap::new()),
259 }
260 }
261
262 fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
269 let mut map = self
270 .locks
271 .lock()
272 .map_err(|_| "state: locks map poisoned".to_string())?;
273 Ok(Arc::clone(
274 map.entry(path.to_path_buf())
275 .or_insert_with(|| Arc::new(Mutex::new(()))),
276 ))
277 }
278
279 pub fn root(&self) -> &Path {
281 &self.root
282 }
283
284 fn ensure_root(&self) -> Result<&Path, String> {
286 if !self.root.exists() {
287 fs::create_dir_all(&self.root)
288 .map_err(|e| format!("Failed to create state dir: {e}"))?;
289 }
290 Ok(&self.root)
291 }
292
293 pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
296 if ns.contains('/')
297 || ns.contains('\\')
298 || ns.contains("..")
299 || ns.contains('\0')
300 || ns.is_empty()
301 {
302 return Err(format!("Invalid namespace: '{ns}'"));
303 }
304 let dir = self.ensure_root()?;
305 Ok(dir.join(format!("{ns}.json")))
306 }
307
308 fn dispatch_path(&self, key: &str) -> Result<Option<PathBuf>, String> {
323 let (prefix, id) = match key.split_once(':') {
324 Some(pair) => pair,
325 None => return Ok(None),
326 };
327 if !is_safe_segment(prefix) || !is_safe_segment(id) {
328 return Ok(None);
329 }
330 let dir = self.ensure_root()?;
331 Ok(Some(dir.join(prefix).join(format!("{id}.json"))))
332 }
333
334 fn load_dispatched(&self, path: &Path) -> Result<Option<Value>, String> {
337 if !path.exists() {
338 return Ok(None);
339 }
340 let content = fs::read_to_string(path)
341 .map_err(|e| format!("Failed to read dispatched state '{}': {e}", path.display()))?;
342 let v: Value = serde_json::from_str(&content)
343 .map_err(|e| format!("Failed to parse dispatched state '{}': {e}", path.display()))?;
344 Ok(Some(v))
345 }
346
347 fn save_dispatched(&self, path: &Path, value: &Value) -> Result<(), String> {
350 if let Some(parent) = path.parent() {
351 if !parent.exists() {
352 fs::create_dir_all(parent).map_err(|e| {
353 format!(
354 "Failed to create dispatched state dir '{}': {e}",
355 parent.display()
356 )
357 })?;
358 }
359 }
360 let tmp = path.with_extension("json.tmp");
361 let content = serde_json::to_string_pretty(value)
362 .map_err(|e| format!("Failed to serialize dispatched state: {e}"))?;
363 fs::write(&tmp, &content)
364 .map_err(|e| format!("Failed to write dispatched state tmp: {e}"))?;
365 fs::rename(&tmp, path)
366 .map_err(|e| format!("Failed to rename dispatched state file: {e}"))?;
367 Ok(())
368 }
369
370 pub fn list_dispatched(&self, namespace: &str) -> Result<Vec<String>, StateError> {
392 if !is_safe_segment(namespace) {
393 return Err(StateError::UnsafeSegment {
394 which: "namespace",
395 value: namespace.to_string(),
396 });
397 }
398 let ns_dir = self.root.join(namespace);
399 if !ns_dir.exists() {
400 return Ok(Vec::new());
401 }
402 let mut keys = Vec::new();
403 let entries = fs::read_dir(&ns_dir).map_err(StateError::IoRead)?;
404 for entry in entries {
405 let entry = entry.map_err(StateError::IoRead)?;
406 let fname = entry.file_name();
407 let fname_str = fname.to_string_lossy();
408 if !fname_str.ends_with(".json")
410 || fname_str.ends_with(".json.bak")
411 || fname_str.ends_with(".json.tmp")
412 {
413 continue;
414 }
415 let key = fname_str
417 .strip_suffix(".json")
418 .unwrap_or(&fname_str)
419 .to_string();
420 keys.push(key);
421 }
422 keys.sort();
423 Ok(keys)
424 }
425
426 pub fn show_dispatched(
443 &self,
444 namespace: &str,
445 key: &str,
446 ) -> Result<serde_json::Value, StateError> {
447 if !is_safe_segment(namespace) {
448 return Err(StateError::UnsafeSegment {
449 which: "namespace",
450 value: namespace.to_string(),
451 });
452 }
453 if !is_safe_segment(key) {
454 return Err(StateError::UnsafeSegment {
455 which: "key",
456 value: key.to_string(),
457 });
458 }
459 let target = self.root.join(namespace).join(format!("{key}.json"));
460 if !target.exists() {
461 return Err(StateError::KeyNotFound {
462 namespace: namespace.to_string(),
463 key: key.to_string(),
464 });
465 }
466 let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
467 let value: serde_json::Value = serde_json::from_str(&content)?;
468 Ok(value)
469 }
470
471 pub fn set_dispatched(
488 &self,
489 ns: &str,
490 key: &str,
491 value: &serde_json::Value,
492 ) -> Result<(), StateError> {
493 if !is_safe_segment(ns) {
494 return Err(StateError::UnsafeSegment {
495 which: "namespace",
496 value: ns.to_string(),
497 });
498 }
499 if !is_safe_segment(key) {
500 return Err(StateError::UnsafeSegment {
501 which: "key",
502 value: key.to_string(),
503 });
504 }
505 let target = self.root.join(ns).join(format!("{key}.json"));
506 if !target.exists() {
507 return self
509 .save_dispatched(&target, value)
510 .map_err(|s| StateError::IoWrite(std::io::Error::other(s)));
511 }
512 let target_bak = target.with_extension("json.bak");
514 fs::copy(&target, &target_bak).map_err(StateError::IoBackup)?;
515 self.save_dispatched(&target, value)
516 .map_err(|s| StateError::IoWrite(std::io::Error::other(s)))
517 }
518
519 pub fn delete_dispatched(&self, ns: &str, key: &str) -> Result<bool, StateError> {
535 if !is_safe_segment(ns) {
536 return Err(StateError::UnsafeSegment {
537 which: "namespace",
538 value: ns.to_string(),
539 });
540 }
541 if !is_safe_segment(key) {
542 return Err(StateError::UnsafeSegment {
543 which: "key",
544 value: key.to_string(),
545 });
546 }
547 let target = self.root.join(ns).join(format!("{key}.json"));
548 if !target.exists() {
549 return Ok(false);
551 }
552 let target_bak = target.with_extension("json.bak");
554 fs::copy(&target, &target_bak).map_err(StateError::IoBackup)?;
555 fs::remove_file(&target).map_err(StateError::IoWrite)?;
556 Ok(true)
557 }
558
559 pub fn reset_dispatched_with_backup(
602 &self,
603 namespace: &str,
604 key: &str,
605 steps: &[String],
606 fields: &[String],
607 ) -> Result<ResetReport, StateError> {
608 if !is_safe_segment(namespace) {
610 return Err(StateError::UnsafeSegment {
611 which: "namespace",
612 value: namespace.to_string(),
613 });
614 }
615 if !is_safe_segment(key) {
616 return Err(StateError::UnsafeSegment {
617 which: "key",
618 value: key.to_string(),
619 });
620 }
621
622 let target = self.root.join(namespace).join(format!("{key}.json"));
624
625 if !target.exists() {
627 return Err(StateError::KeyNotFound {
628 namespace: namespace.to_string(),
629 key: key.to_string(),
630 });
631 }
632
633 let lock = self
635 .ns_lock(&target)
636 .map_err(|s| StateError::ShapeInvalid { reason: s })?;
637 let _guard = lock.lock().map_err(|_| StateError::ShapeInvalid {
638 reason: "lock poisoned".to_string(),
639 })?;
640
641 let bak_path = target.with_extension("json.bak");
643 fs::copy(&target, &bak_path).map_err(StateError::IoBackup)?;
644
645 let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
647 let mut value: serde_json::Value = serde_json::from_str(&content)?;
648
649 let data = value
651 .get_mut("data")
652 .ok_or_else(|| StateError::ShapeInvalid {
653 reason: "missing 'data' top-level field".to_string(),
654 })?;
655 let data_obj = data
656 .as_object_mut()
657 .ok_or_else(|| StateError::ShapeInvalid {
658 reason: "'data' top-level field must be an object".to_string(),
659 })?;
660
661 let mut steps_removed = 0usize;
663 if !steps.is_empty() {
664 if let Some(cs) = data_obj.get_mut("completed_steps") {
665 if let Some(arr) = cs.as_array_mut() {
666 let before = arr.len();
667 arr.retain(|v| {
668 if let Some(s) = v.as_str() {
669 !steps.iter().any(|step| step == s)
670 } else {
671 true
672 }
673 });
674 steps_removed = before - arr.len();
675 } else {
676 return Err(StateError::ShapeInvalid {
677 reason: "data.completed_steps must be an array".to_string(),
678 });
679 }
680 }
681 }
683
684 let mut fields_removed = 0usize;
686 for field in fields {
687 if data_obj.remove(field.as_str()).is_some() {
688 fields_removed += 1;
689 }
690 }
691
692 let tmp = target.with_extension("json.tmp");
694 let serialized = serde_json::to_string_pretty(&value)?;
695 fs::write(&tmp, &serialized).map_err(StateError::IoWrite)?;
696
697 fs::rename(&tmp, &target).map_err(StateError::IoWrite)?;
699
700 Ok(ResetReport {
701 backup_path: bak_path,
702 steps_removed,
703 fields_removed,
704 })
705 }
706
707 fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
708 let path = self.state_path(ns)?;
709 if !path.exists() {
710 return Ok(HashMap::new());
711 }
712 let content =
713 fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
714 serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
715 }
716
717 fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
718 let path = self.state_path(ns)?;
719 let tmp = path.with_extension("json.tmp");
720 let content = serde_json::to_string_pretty(data)
721 .map_err(|e| format!("Failed to serialize state: {e}"))?;
722 fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
723 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
724 Ok(())
725 }
726}
727
728impl StateStore for JsonFileStore {
729 fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
730 if let Some(dpath) = self.dispatch_path(key)? {
735 let lock = self.ns_lock(&dpath)?;
736 let _guard = lock
737 .lock()
738 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
739 if let Some(v) = self.load_dispatched(&dpath)? {
740 return Ok(Some(v));
741 }
742 }
745 let path = self.state_path(ns)?;
746 let lock = self.ns_lock(&path)?;
747 let _guard = lock
748 .lock()
749 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
750 let state = self.load(ns)?;
751 Ok(state.get(key).cloned())
752 }
753
754 fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
755 if let Some(dpath) = self.dispatch_path(key)? {
756 let lock = self.ns_lock(&dpath)?;
757 let _guard = lock
758 .lock()
759 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
760 return self.save_dispatched(&dpath, &value);
761 }
762 let path = self.state_path(ns)?;
763 let lock = self.ns_lock(&path)?;
764 let _guard = lock
765 .lock()
766 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
767 let mut state = self.load(ns)?;
768 state.insert(key.to_string(), value);
769 self.save(ns, &state)
770 }
771
772 fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
773 if let Some(dpath) = self.dispatch_path(key)? {
774 let lock = self.ns_lock(&dpath)?;
775 let _guard = lock
776 .lock()
777 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
778 if dpath.exists() {
779 fs::remove_file(&dpath).map_err(|e| {
780 format!(
781 "Failed to delete dispatched state '{}': {e}",
782 dpath.display()
783 )
784 })?;
785 return Ok(true);
786 }
787 }
790 let path = self.state_path(ns)?;
791 let lock = self.ns_lock(&path)?;
792 let _guard = lock
793 .lock()
794 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
795 let mut state = self.load(ns)?;
796 let existed = state.remove(key).is_some();
797 if existed {
798 self.save(ns, &state)?;
799 }
800 Ok(existed)
801 }
802
803 fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
804 let path = self.state_path(ns)?;
805 let lock = self.ns_lock(&path)?;
806 let _guard = lock
807 .lock()
808 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
809 let state = self.load(ns)?;
810 Ok(state.keys().cloned().collect())
811 }
812
813 fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
814 if let Some(dpath) = self.dispatch_path(key)? {
815 let lock = self.ns_lock(&dpath)?;
816 let _guard = lock
817 .lock()
818 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
819 if dpath.exists() {
820 return Ok(true);
821 }
822 }
824 let path = self.state_path(ns)?;
825 let lock = self.ns_lock(&path)?;
826 let _guard = lock
827 .lock()
828 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
829 let state = self.load(ns)?;
830 Ok(state.contains_key(key))
831 }
832
833 fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
834 if let Some(dpath) = self.dispatch_path(key)? {
835 let lock = self.ns_lock(&dpath)?;
836 let _guard = lock
837 .lock()
838 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
839 if dpath.exists() {
840 return Ok(false);
841 }
842 let path = self.state_path(ns)?;
845 if path.exists() {
846 let state = self.load(ns)?;
847 if state.contains_key(key) {
848 return Ok(false);
849 }
850 }
851 self.save_dispatched(&dpath, &value)?;
852 return Ok(true);
853 }
854 let path = self.state_path(ns)?;
855 let lock = self.ns_lock(&path)?;
856 let _guard = lock
857 .lock()
858 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
859 let mut state = self.load(ns)?;
860 if state.contains_key(key) {
861 return Ok(false);
862 }
863 state.insert(key.to_string(), value);
864 self.save(ns, &state)?;
865 Ok(true)
866 }
867
868 fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
869 if let Some(dpath) = self.dispatch_path(key)? {
870 let lock = self.ns_lock(&dpath)?;
871 let _guard = lock
872 .lock()
873 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
874 let current = if let Some(v) = self.load_dispatched(&dpath)? {
875 v.as_f64()
876 .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?
877 } else {
878 let path = self.state_path(ns)?;
881 if path.exists() {
882 let state = self.load(ns)?;
883 match state.get(key) {
884 Some(v) => v
885 .as_f64()
886 .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
887 None => default,
888 }
889 } else {
890 default
891 }
892 };
893 let new_val = current + delta;
894 self.save_dispatched(&dpath, &serde_json::json!(new_val))?;
895 return Ok(new_val);
896 }
897 let path = self.state_path(ns)?;
898 let lock = self.ns_lock(&path)?;
899 let _guard = lock
900 .lock()
901 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
902 let mut state = self.load(ns)?;
903 let current = match state.get(key) {
904 Some(v) => v
905 .as_f64()
906 .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
907 None => default,
908 };
909 let new_val = current + delta;
910 state.insert(key.to_string(), serde_json::json!(new_val));
911 self.save(ns, &state)?;
912 Ok(new_val)
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::*;
919 use tempfile::TempDir;
920
921 fn new_store() -> (JsonFileStore, TempDir) {
924 let tmp = tempfile::tempdir().unwrap();
925 let store = JsonFileStore::new(tmp.path().to_path_buf());
926 (store, tmp)
927 }
928
929 #[test]
930 fn roundtrip() {
931 let (store, _tmp) = new_store();
932 let ns = "rt";
933
934 store.set(ns, "count", serde_json::json!(42)).unwrap();
935 store
936 .set(ns, "name", serde_json::json!("algocline"))
937 .unwrap();
938
939 assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
940 assert_eq!(
941 store.get(ns, "name").unwrap(),
942 Some(serde_json::json!("algocline"))
943 );
944 assert_eq!(store.get(ns, "missing").unwrap(), None);
945
946 let k = store.keys(ns).unwrap();
947 assert!(k.contains(&"count".to_string()));
948 assert!(k.contains(&"name".to_string()));
949
950 assert!(store.delete(ns, "count").unwrap());
951 assert!(!store.delete(ns, "count").unwrap());
952 assert_eq!(store.get(ns, "count").unwrap(), None);
953 }
954
955 #[test]
956 fn invalid_namespace() {
957 let (store, _tmp) = new_store();
958 assert!(store.state_path("../evil").is_err());
959 assert!(store.state_path("foo/bar").is_err());
960 assert!(store.state_path("foo\\bar").is_err());
961 assert!(store.state_path("").is_err());
962 assert!(store.state_path("foo\0bar").is_err());
963 }
964
965 #[test]
966 fn get_nonexistent_namespace_returns_empty() {
967 let (store, _tmp) = new_store();
968 let result = store.get("ghost_ns", "any_key").unwrap();
969 assert_eq!(result, None);
970 }
971
972 #[test]
973 fn keys_nonexistent_namespace_returns_empty() {
974 let (store, _tmp) = new_store();
975 let result = store.keys("ghost_ns").unwrap();
976 assert!(result.is_empty());
977 }
978
979 #[test]
980 fn delete_nonexistent_key_returns_false() {
981 let (store, _tmp) = new_store();
982 assert!(!store.delete("delns", "nope").unwrap());
983 }
984
985 #[test]
986 fn set_overwrites_existing_value() {
987 let (store, _tmp) = new_store();
988 let ns = "ow";
989
990 store.set(ns, "k", serde_json::json!(1)).unwrap();
991 store.set(ns, "k", serde_json::json!(2)).unwrap();
992 assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
993 }
994
995 #[test]
996 fn state_path_valid_namespaces() {
997 let (store, _tmp) = new_store();
998 assert!(store.state_path("default").is_ok());
999 assert!(store.state_path("my-app").is_ok());
1000 assert!(store.state_path("test_123").is_ok());
1001 }
1002
1003 #[test]
1006 fn has_returns_existence() {
1007 let (store, _tmp) = new_store();
1008 let ns = "hasns";
1009
1010 assert!(!store.has(ns, "x").unwrap());
1011 store.set(ns, "x", serde_json::json!(1)).unwrap();
1012 assert!(store.has(ns, "x").unwrap());
1013 }
1014
1015 #[test]
1016 fn set_nx_only_sets_if_absent() {
1017 let (store, _tmp) = new_store();
1018 let ns = "snx";
1019
1020 assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
1021 assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
1022 assert_eq!(
1023 store.get(ns, "k").unwrap(),
1024 Some(serde_json::json!("first")),
1025 "set_nx should not overwrite"
1026 );
1027 }
1028
1029 #[test]
1030 fn incr_initialises_and_increments() {
1031 let (store, _tmp) = new_store();
1032 let ns = "inc";
1033
1034 let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
1036 assert!((v - 1.0).abs() < f64::EPSILON);
1037
1038 let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
1040 assert!((v - 6.0).abs() < f64::EPSILON);
1041
1042 let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
1044 assert!((v - 4.0).abs() < f64::EPSILON);
1045 }
1046
1047 #[test]
1048 fn incr_rejects_non_numeric() {
1049 let (store, _tmp) = new_store();
1050 let ns = "incerr";
1051
1052 store.set(ns, "s", serde_json::json!("hello")).unwrap();
1053 let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
1054 assert!(err.contains("not a number"), "got: {err}");
1055 }
1056
1057 #[test]
1058 fn incr_custom_default() {
1059 let (store, _tmp) = new_store();
1060 let ns = "incdef";
1061
1062 let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
1063 assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
1064 }
1065
1066 #[test]
1072 fn dispatch_writes_to_per_key_file_for_prefix_id_keys() {
1073 let (store, tmp) = new_store();
1074 store
1075 .set(
1076 "default",
1077 "flow_orch:abc-123",
1078 serde_json::json!({"step": 1}),
1079 )
1080 .unwrap();
1081 let dispatched = tmp.path().join("flow_orch").join("abc-123.json");
1082 assert!(
1083 dispatched.exists(),
1084 "dispatched file must exist at {}",
1085 dispatched.display()
1086 );
1087 let legacy = tmp.path().join("default.json");
1089 assert!(
1090 !legacy.exists(),
1091 "legacy default.json must not be created for dispatched keys"
1092 );
1093 }
1094
1095 #[test]
1099 fn dispatch_read_falls_back_to_legacy_for_unmigrated_entries() {
1100 let (store, tmp) = new_store();
1101 store
1106 .set("default", "boot_marker", serde_json::json!(true))
1107 .unwrap();
1108 let legacy_path = tmp.path().join("default.json");
1109 let mut existing: HashMap<String, Value> =
1110 serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1111 existing.insert(
1112 "flow_legacy:xyz".to_string(),
1113 serde_json::json!({"old": "value"}),
1114 );
1115 std::fs::write(
1116 &legacy_path,
1117 serde_json::to_string_pretty(&existing).unwrap(),
1118 )
1119 .unwrap();
1120
1121 assert_eq!(
1123 store.get("default", "flow_legacy:xyz").unwrap(),
1124 Some(serde_json::json!({"old": "value"})),
1125 "must fall back to legacy default.json when dispatched file absent"
1126 );
1127
1128 store
1131 .set(
1132 "default",
1133 "flow_legacy:xyz",
1134 serde_json::json!({"new": "promoted"}),
1135 )
1136 .unwrap();
1137 assert!(
1138 tmp.path().join("flow_legacy").join("xyz.json").exists(),
1139 "set() must promote dispatched-shaped keys to per-key file"
1140 );
1141 assert_eq!(
1142 store.get("default", "flow_legacy:xyz").unwrap(),
1143 Some(serde_json::json!({"new": "promoted"})),
1144 "dispatched file must shadow legacy entry on subsequent reads"
1145 );
1146 }
1147
1148 #[test]
1151 fn dispatch_skips_keys_without_colon_or_with_unsafe_segments() {
1152 let (store, tmp) = new_store();
1153 store
1154 .set("default", "no_colon", serde_json::json!(1))
1155 .unwrap();
1156 store
1157 .set("default", "bad/prefix:id", serde_json::json!(2))
1158 .unwrap();
1159 store
1160 .set("default", "prefix:bad/id", serde_json::json!(3))
1161 .unwrap();
1162 store
1163 .set("default", "prefix:..", serde_json::json!(4))
1164 .unwrap();
1165 let legacy = tmp.path().join("default.json");
1167 let raw: HashMap<String, Value> =
1168 serde_json::from_str(&std::fs::read_to_string(&legacy).unwrap()).unwrap();
1169 assert_eq!(raw.get("no_colon"), Some(&serde_json::json!(1)));
1170 assert_eq!(raw.get("bad/prefix:id"), Some(&serde_json::json!(2)));
1171 assert_eq!(raw.get("prefix:bad/id"), Some(&serde_json::json!(3)));
1172 assert_eq!(raw.get("prefix:.."), Some(&serde_json::json!(4)));
1173 assert!(!tmp.path().join("bad").exists());
1175 assert!(!tmp.path().join("prefix").exists());
1176 }
1177
1178 #[test]
1180 fn dispatch_delete_removes_per_key_file() {
1181 let (store, tmp) = new_store();
1182 store.set("default", "p:q", serde_json::json!("v")).unwrap();
1183 let dispatched = tmp.path().join("p").join("q.json");
1184 assert!(
1185 dispatched.exists(),
1186 "dispatched file should exist before delete"
1187 );
1188 assert!(store.delete("default", "p:q").unwrap());
1189 assert!(
1190 !dispatched.exists(),
1191 "dispatched file should be removed after delete"
1192 );
1193 assert!(!store.delete("default", "p:q").unwrap());
1195 }
1196
1197 #[test]
1199 fn dispatch_has_reports_dispatched_file_existence() {
1200 let (store, _tmp) = new_store();
1201 assert!(!store.has("default", "p:q").unwrap());
1202 store.set("default", "p:q", serde_json::json!("v")).unwrap();
1203 assert!(store.has("default", "p:q").unwrap());
1204 }
1205
1206 #[test]
1209 fn dispatch_set_nx_blocks_when_legacy_or_dispatched_entry_exists() {
1210 let (store, tmp) = new_store();
1211 store
1213 .set("default", "boot", serde_json::json!(true))
1214 .unwrap();
1215 let legacy_path = tmp.path().join("default.json");
1216 let mut existing: HashMap<String, Value> =
1217 serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1218 existing.insert("p:q".to_string(), serde_json::json!("legacy_only"));
1219 std::fs::write(
1220 &legacy_path,
1221 serde_json::to_string_pretty(&existing).unwrap(),
1222 )
1223 .unwrap();
1224 assert!(!store
1226 .set_nx("default", "p:q", serde_json::json!("new"))
1227 .unwrap());
1228
1229 assert!(store
1233 .set_nx("default", "p:r", serde_json::json!("first"))
1234 .unwrap());
1235 assert!(tmp.path().join("p").join("r.json").exists());
1236 assert!(!store
1237 .set_nx("default", "p:r", serde_json::json!("second"))
1238 .unwrap());
1239 }
1240
1241 #[test]
1245 fn dispatch_incr_promotes_legacy_value_on_first_call() {
1246 let (store, tmp) = new_store();
1247 store.set("default", "seed", serde_json::json!(0)).unwrap();
1249 let legacy_path = tmp.path().join("default.json");
1250 let mut existing: HashMap<String, Value> =
1251 serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1252 existing.insert("counter:cnt".to_string(), serde_json::json!(7));
1253 std::fs::write(
1254 &legacy_path,
1255 serde_json::to_string_pretty(&existing).unwrap(),
1256 )
1257 .unwrap();
1258
1259 let result = store.incr("default", "counter:cnt", 3.0, 0.0).unwrap();
1262 assert_eq!(result, 10.0);
1263 let dispatched = tmp.path().join("counter").join("cnt.json");
1264 assert!(dispatched.exists(), "dispatched file must be created");
1265
1266 let result2 = store.incr("default", "counter:cnt", 2.0, 0.0).unwrap();
1268 assert_eq!(result2, 12.0);
1269 }
1270
1271 #[test]
1274 fn is_safe_segment_validates_path_safety() {
1275 assert!(is_safe_segment("flow_orch"));
1276 assert!(is_safe_segment("abc-123"));
1277 assert!(is_safe_segment("v1.2.3"));
1278 assert!(!is_safe_segment(""));
1279 assert!(!is_safe_segment("."));
1280 assert!(!is_safe_segment(".."));
1281 assert!(!is_safe_segment("a..b"));
1282 assert!(!is_safe_segment("a/b"));
1283 assert!(!is_safe_segment("a\\b"));
1284 assert!(!is_safe_segment("a b"));
1285 assert!(!is_safe_segment("a\0b"));
1286 }
1287
1288 mod dispatched_layout {
1291 use super::*;
1292
1293 fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1296 let dir = tmp.path().join(ns);
1297 fs::create_dir_all(&dir).expect("create ns dir");
1299 let path = dir.join(format!("{key}.json"));
1300 fs::write(
1301 path,
1302 serde_json::to_string_pretty(&value).expect("serialize"),
1303 )
1304 .expect("write seed file");
1305 }
1306
1307 #[test]
1309 fn list_returns_json_keys_only() {
1310 let (store, tmp) = new_store();
1311 seed(&tmp, "myns", "alpha", serde_json::json!(1));
1312 seed(&tmp, "myns", "beta", serde_json::json!(2));
1313 let ns_dir = tmp.path().join("myns");
1315 fs::write(ns_dir.join("alpha.json.bak"), b"backup").expect("write bak");
1316 fs::write(ns_dir.join("alpha.json.tmp"), b"tmp").expect("write tmp");
1317 fs::write(ns_dir.join("notes.txt"), b"text").expect("write txt");
1318
1319 let keys = store.list_dispatched("myns").unwrap();
1320 assert_eq!(
1321 keys,
1322 vec!["alpha", "beta"],
1323 "must be sorted, .bak/.tmp/.txt excluded"
1324 );
1325 }
1326
1327 #[test]
1330 fn list_returns_empty_for_absent_namespace() {
1331 let (store, _tmp) = new_store();
1332 let keys = store.list_dispatched("ghost").unwrap();
1333 assert!(keys.is_empty(), "absent namespace should return empty Vec");
1334 }
1335
1336 #[test]
1339 fn list_returns_empty_when_only_non_json_files_present() {
1340 let (store, tmp) = new_store();
1341 let ns_dir = tmp.path().join("empty_ns");
1342 fs::create_dir_all(&ns_dir).expect("create dir");
1344 fs::write(ns_dir.join("readme.txt"), b"hi").expect("write");
1345 let keys = store.list_dispatched("empty_ns").unwrap();
1346 assert!(keys.is_empty());
1347 }
1348
1349 #[test]
1352 fn show_returns_key_not_found_for_absent_namespace() {
1353 let (store, _tmp) = new_store();
1354 let err = store.show_dispatched("nodir", "anykey").unwrap_err();
1355 assert!(
1356 matches!(err, StateError::KeyNotFound { .. }),
1357 "expected KeyNotFound, got: {err}"
1358 );
1359 assert!(err.to_string().contains("not found"), "{err}");
1361 }
1362
1363 #[test]
1366 fn show_returns_key_not_found_for_absent_key() {
1367 let (store, tmp) = new_store();
1368 let ns_dir = tmp.path().join("myns2");
1370 fs::create_dir_all(&ns_dir).expect("create dir");
1372
1373 let err = store.show_dispatched("myns2", "missing").unwrap_err();
1374 assert!(
1375 matches!(err, StateError::KeyNotFound { .. }),
1376 "expected KeyNotFound, got: {err}"
1377 );
1378 }
1379
1380 #[test]
1382 fn show_returns_full_value_for_existing_key() {
1383 let (store, tmp) = new_store();
1384 let expected = serde_json::json!({"data": {"completed_steps": ["a", "b"], "x": 42}});
1385 seed(&tmp, "showns", "task1", expected.clone());
1386
1387 let result = store.show_dispatched("showns", "task1").unwrap();
1388 assert_eq!(result, expected);
1389 }
1390 }
1391
1392 mod reset_atomicity {
1393 use super::*;
1394
1395 fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1397 let dir = tmp.path().join(ns);
1398 fs::create_dir_all(&dir).expect("create ns dir");
1400 let path = dir.join(format!("{key}.json"));
1401 fs::write(
1402 path,
1403 serde_json::to_string_pretty(&value).expect("serialize"),
1404 )
1405 .expect("write seed");
1406 }
1407
1408 #[test]
1411 fn reset_removes_steps_and_fields_and_creates_backup() {
1412 let (store, tmp) = new_store();
1413 let original = serde_json::json!({
1414 "data": {
1415 "completed_steps": ["a", "b", "c"],
1416 "x": 1,
1417 "y": "hello"
1418 }
1419 });
1420 seed(&tmp, "testns", "task1", original.clone());
1421
1422 let report = store
1423 .reset_dispatched_with_backup(
1424 "testns",
1425 "task1",
1426 &["b".to_string()],
1427 &["x".to_string()],
1428 )
1429 .unwrap();
1430
1431 let bak_path = tmp.path().join("testns").join("task1.json.bak");
1433 assert!(
1434 bak_path.exists(),
1435 ".bak file must exist at {}",
1436 bak_path.display()
1437 );
1438 assert_eq!(report.backup_path, bak_path);
1439 let bak_content: serde_json::Value =
1440 serde_json::from_str(&fs::read_to_string(&bak_path).expect("read bak"))
1441 .expect("parse bak");
1442 assert_eq!(bak_content, original, ".bak must contain original content");
1443
1444 let live_path = tmp.path().join("testns").join("task1.json");
1446 let live_content: serde_json::Value =
1447 serde_json::from_str(&fs::read_to_string(&live_path).expect("read live"))
1448 .expect("parse live");
1449 let expected = serde_json::json!({
1450 "data": {
1451 "completed_steps": ["a", "c"],
1452 "y": "hello"
1453 }
1454 });
1455 assert_eq!(live_content, expected, "live file must be mutated");
1456
1457 assert_eq!(report.steps_removed, 1, "one step removed");
1459 assert_eq!(report.fields_removed, 1, "one field removed");
1460 }
1461
1462 #[test]
1464 fn reset_removes_multiple_steps_and_fields() {
1465 let (store, tmp) = new_store();
1466 let original = serde_json::json!({
1467 "data": {
1468 "completed_steps": ["s1", "s2", "s3", "s4"],
1469 "repo_readiness": "NOT_READY",
1470 "repo_readiness_report": "details here",
1471 "plan_gate_retries": 2
1472 }
1473 });
1474 seed(&tmp, "orchns", "task-abc", original.clone());
1475
1476 let report = store
1477 .reset_dispatched_with_backup(
1478 "orchns",
1479 "task-abc",
1480 &["s2".to_string(), "s3".to_string()],
1481 &[
1482 "repo_readiness".to_string(),
1483 "repo_readiness_report".to_string(),
1484 ],
1485 )
1486 .unwrap();
1487
1488 let live_path = tmp.path().join("orchns").join("task-abc.json");
1489 let live: serde_json::Value =
1490 serde_json::from_str(&fs::read_to_string(&live_path).expect("read"))
1491 .expect("parse");
1492 assert_eq!(
1493 live["data"]["completed_steps"],
1494 serde_json::json!(["s1", "s4"])
1495 );
1496 assert!(live["data"].get("repo_readiness").is_none());
1497 assert!(live["data"].get("repo_readiness_report").is_none());
1498 assert_eq!(live["data"]["plan_gate_retries"], 2);
1499
1500 assert_eq!(report.steps_removed, 2);
1501 assert_eq!(report.fields_removed, 2);
1502 }
1503
1504 #[test]
1506 fn reset_returns_key_not_found_for_absent_file() {
1507 let (store, _tmp) = new_store();
1508 let err = store
1509 .reset_dispatched_with_backup("ns", "missing", &[], &[])
1510 .unwrap_err();
1511 assert!(
1512 matches!(err, StateError::KeyNotFound { .. }),
1513 "expected KeyNotFound, got: {err}"
1514 );
1515 }
1516
1517 #[test]
1519 fn reset_returns_shape_invalid_when_data_absent() {
1520 let (store, tmp) = new_store();
1521 let bad = serde_json::json!({"identity": {"task_id": "t1"}});
1523 let dir = tmp.path().join("badns");
1524 fs::create_dir_all(&dir).expect("create dir");
1526 fs::write(
1527 dir.join("k.json"),
1528 serde_json::to_string_pretty(&bad).expect("ser"),
1529 )
1530 .expect("write");
1531
1532 let err = store
1533 .reset_dispatched_with_backup("badns", "k", &["s".to_string()], &[])
1534 .unwrap_err();
1535 assert!(
1536 matches!(err, StateError::ShapeInvalid { .. }),
1537 "expected ShapeInvalid, got: {err}"
1538 );
1539 assert!(err.to_string().contains("data"), "{err}");
1540 }
1541
1542 #[test]
1545 fn reset_returns_shape_invalid_when_completed_steps_not_array() {
1546 let (store, tmp) = new_store();
1547 let bad = serde_json::json!({"data": {"completed_steps": {"step": "a"}}});
1549 let dir = tmp.path().join("badns2");
1550 fs::create_dir_all(&dir).expect("create dir");
1552 fs::write(
1553 dir.join("k.json"),
1554 serde_json::to_string_pretty(&bad).expect("ser"),
1555 )
1556 .expect("write");
1557
1558 let err = store
1559 .reset_dispatched_with_backup("badns2", "k", &["a".to_string()], &[])
1560 .unwrap_err();
1561 assert!(
1562 matches!(err, StateError::ShapeInvalid { .. }),
1563 "expected ShapeInvalid, got: {err}"
1564 );
1565 assert!(
1566 err.to_string().contains("completed_steps"),
1567 "message should mention completed_steps: {err}"
1568 );
1569 }
1570 }
1571
1572 mod path_traversal {
1573 use super::*;
1574
1575 #[test]
1577 fn list_rejects_unsafe_namespace() {
1578 let (store, _tmp) = new_store();
1579 let err = store.list_dispatched("../evil").unwrap_err();
1580 assert!(
1581 matches!(
1582 err,
1583 StateError::UnsafeSegment {
1584 which: "namespace",
1585 ..
1586 }
1587 ),
1588 "expected UnsafeSegment{{namespace}}, got: {err}"
1589 );
1590 }
1591
1592 #[test]
1594 fn show_rejects_unsafe_key() {
1595 let (store, _tmp) = new_store();
1596 let err = store.show_dispatched("ns", "foo/bar").unwrap_err();
1597 assert!(
1598 matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1599 "expected UnsafeSegment{{key}}, got: {err}"
1600 );
1601 }
1602
1603 #[test]
1605 fn reset_rejects_empty_namespace() {
1606 let (store, _tmp) = new_store();
1607 let err = store
1608 .reset_dispatched_with_backup("", "key", &[], &[])
1609 .unwrap_err();
1610 assert!(
1611 matches!(
1612 err,
1613 StateError::UnsafeSegment {
1614 which: "namespace",
1615 ..
1616 }
1617 ),
1618 "expected UnsafeSegment{{namespace}}, got: {err}"
1619 );
1620 }
1621
1622 #[test]
1624 fn reset_rejects_dotdot_key() {
1625 let (store, _tmp) = new_store();
1626 let err = store
1627 .reset_dispatched_with_backup("ns", "..", &[], &[])
1628 .unwrap_err();
1629 assert!(
1630 matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1631 "expected UnsafeSegment{{key}}, got: {err}"
1632 );
1633 }
1634 }
1635}
1636
1637#[cfg(test)]
1638mod proptests {
1639 use super::*;
1640 use proptest::prelude::*;
1641
1642 fn new_store() -> (JsonFileStore, tempfile::TempDir) {
1643 let tmp = tempfile::tempdir().unwrap();
1644 let store = JsonFileStore::new(tmp.path().to_path_buf());
1645 (store, tmp)
1646 }
1647
1648 proptest! {
1649 #[test]
1651 fn roundtrip_arbitrary_values(
1652 key in "[a-z]{1,20}",
1653 val in any::<i64>(),
1654 ) {
1655 let (store, _tmp) = new_store();
1656 let ns = "rt";
1657 let json_val = serde_json::json!(val);
1658 store.set(ns, &key, json_val.clone()).unwrap();
1659 let got = store.get(ns, &key).unwrap();
1660 prop_assert_eq!(got, Some(json_val));
1661 let _ = store.delete(ns, &key);
1662 }
1663
1664 #[test]
1666 fn traversal_always_rejected(
1667 prefix in "[a-z]{0,5}",
1668 suffix in "[a-z]{0,5}",
1669 ) {
1670 let (store, _tmp) = new_store();
1671 let evil = format!("{prefix}/../{suffix}");
1672 prop_assert!(store.state_path(&evil).is_err());
1673 }
1674
1675 #[test]
1677 fn nul_byte_always_rejected(
1678 prefix in "[a-z]{0,10}",
1679 suffix in "[a-z]{0,10}",
1680 ) {
1681 let (store, _tmp) = new_store();
1682 let evil = format!("{prefix}\0{suffix}");
1683 prop_assert!(store.state_path(&evil).is_err());
1684 }
1685 }
1686}