1use std::collections::HashMap;
49use std::fs;
50use std::path::{Path, PathBuf};
51use std::sync::{Arc, Mutex};
52
53use serde_json::Value;
54
55#[derive(thiserror::Error, Debug)]
67pub enum StateError {
68 #[error("state: key '{key}' not found in namespace '{namespace}'")]
74 KeyNotFound { namespace: String, key: String },
75
76 #[error("state: unsafe {which} segment '{value}'")]
82 UnsafeSegment { which: &'static str, value: String },
83
84 #[error("state: backup I/O failed: {0}")]
90 IoBackup(#[source] std::io::Error),
91
92 #[error("state: read failed: {0}")]
97 IoRead(#[source] std::io::Error),
98
99 #[error("state: write failed: {0}")]
105 IoWrite(#[source] std::io::Error),
106
107 #[error("state: serialize/parse failed: {0}")]
111 Serde(#[from] serde_json::Error),
112
113 #[error("state: shape invalid: {reason}")]
120 ShapeInvalid { reason: String },
121}
122
123#[derive(Debug, Clone)]
130pub struct ResetReport {
131 pub backup_path: PathBuf,
133 pub steps_removed: usize,
135 pub fields_removed: usize,
137}
138
139fn is_safe_segment(s: &str) -> bool {
147 if s.is_empty() || s == "." || s == ".." {
148 return false;
149 }
150 if s.contains("..") {
151 return false;
152 }
153 s.bytes()
154 .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-' || b == b'.')
155}
156
157pub trait StateStore: Send + Sync {
166 fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String>;
168
169 fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String>;
171
172 fn delete(&self, ns: &str, key: &str) -> Result<bool, String>;
174
175 fn keys(&self, ns: &str) -> Result<Vec<String>, String>;
177
178 fn has(&self, ns: &str, key: &str) -> Result<bool, String>;
184
185 fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String>;
195
196 fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String>;
212}
213
214pub struct JsonFileStore {
244 root: PathBuf,
245 locks: Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>,
248}
249
250impl JsonFileStore {
251 pub fn new(root: PathBuf) -> Self {
256 Self {
257 root,
258 locks: Mutex::new(HashMap::new()),
259 }
260 }
261
262 fn ns_lock(&self, path: &Path) -> Result<Arc<Mutex<()>>, String> {
269 let mut map = self
270 .locks
271 .lock()
272 .map_err(|_| "state: locks map poisoned".to_string())?;
273 Ok(Arc::clone(
274 map.entry(path.to_path_buf())
275 .or_insert_with(|| Arc::new(Mutex::new(()))),
276 ))
277 }
278
279 pub fn root(&self) -> &Path {
281 &self.root
282 }
283
284 fn ensure_root(&self) -> Result<&Path, String> {
286 if !self.root.exists() {
287 fs::create_dir_all(&self.root)
288 .map_err(|e| format!("Failed to create state dir: {e}"))?;
289 }
290 Ok(&self.root)
291 }
292
293 pub fn state_path(&self, ns: &str) -> Result<PathBuf, String> {
296 if ns.contains('/')
297 || ns.contains('\\')
298 || ns.contains("..")
299 || ns.contains('\0')
300 || ns.is_empty()
301 {
302 return Err(format!("Invalid namespace: '{ns}'"));
303 }
304 let dir = self.ensure_root()?;
305 Ok(dir.join(format!("{ns}.json")))
306 }
307
308 fn dispatch_path(&self, key: &str) -> Result<Option<PathBuf>, String> {
323 let (prefix, id) = match key.split_once(':') {
324 Some(pair) => pair,
325 None => return Ok(None),
326 };
327 if !is_safe_segment(prefix) || !is_safe_segment(id) {
328 return Ok(None);
329 }
330 let dir = self.ensure_root()?;
331 Ok(Some(dir.join(prefix).join(format!("{id}.json"))))
332 }
333
334 fn load_dispatched(&self, path: &Path) -> Result<Option<Value>, String> {
337 if !path.exists() {
338 return Ok(None);
339 }
340 let content = fs::read_to_string(path)
341 .map_err(|e| format!("Failed to read dispatched state '{}': {e}", path.display()))?;
342 let v: Value = serde_json::from_str(&content)
343 .map_err(|e| format!("Failed to parse dispatched state '{}': {e}", path.display()))?;
344 Ok(Some(v))
345 }
346
347 fn save_dispatched(&self, path: &Path, value: &Value) -> Result<(), String> {
350 if let Some(parent) = path.parent() {
351 if !parent.exists() {
352 fs::create_dir_all(parent).map_err(|e| {
353 format!(
354 "Failed to create dispatched state dir '{}': {e}",
355 parent.display()
356 )
357 })?;
358 }
359 }
360 let tmp = path.with_extension("json.tmp");
361 let content = serde_json::to_string_pretty(value)
362 .map_err(|e| format!("Failed to serialize dispatched state: {e}"))?;
363 fs::write(&tmp, &content)
364 .map_err(|e| format!("Failed to write dispatched state tmp: {e}"))?;
365 fs::rename(&tmp, path)
366 .map_err(|e| format!("Failed to rename dispatched state file: {e}"))?;
367 Ok(())
368 }
369
370 pub fn list_dispatched(&self, namespace: &str) -> Result<Vec<String>, StateError> {
392 if !is_safe_segment(namespace) {
393 return Err(StateError::UnsafeSegment {
394 which: "namespace",
395 value: namespace.to_string(),
396 });
397 }
398 let ns_dir = self.root.join(namespace);
399 if !ns_dir.exists() {
400 return Ok(Vec::new());
401 }
402 let mut keys = Vec::new();
403 let entries = fs::read_dir(&ns_dir).map_err(StateError::IoRead)?;
404 for entry in entries {
405 let entry = entry.map_err(StateError::IoRead)?;
406 let fname = entry.file_name();
407 let fname_str = fname.to_string_lossy();
408 if !fname_str.ends_with(".json")
410 || fname_str.ends_with(".json.bak")
411 || fname_str.ends_with(".json.tmp")
412 {
413 continue;
414 }
415 let key = fname_str
417 .strip_suffix(".json")
418 .unwrap_or(&fname_str)
419 .to_string();
420 keys.push(key);
421 }
422 keys.sort();
423 Ok(keys)
424 }
425
426 pub fn show_dispatched(
443 &self,
444 namespace: &str,
445 key: &str,
446 ) -> Result<serde_json::Value, StateError> {
447 if !is_safe_segment(namespace) {
448 return Err(StateError::UnsafeSegment {
449 which: "namespace",
450 value: namespace.to_string(),
451 });
452 }
453 if !is_safe_segment(key) {
454 return Err(StateError::UnsafeSegment {
455 which: "key",
456 value: key.to_string(),
457 });
458 }
459 let target = self.root.join(namespace).join(format!("{key}.json"));
460 if !target.exists() {
461 return Err(StateError::KeyNotFound {
462 namespace: namespace.to_string(),
463 key: key.to_string(),
464 });
465 }
466 let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
467 let value: serde_json::Value = serde_json::from_str(&content)?;
468 Ok(value)
469 }
470
471 pub fn reset_dispatched_with_backup(
514 &self,
515 namespace: &str,
516 key: &str,
517 steps: &[String],
518 fields: &[String],
519 ) -> Result<ResetReport, StateError> {
520 if !is_safe_segment(namespace) {
522 return Err(StateError::UnsafeSegment {
523 which: "namespace",
524 value: namespace.to_string(),
525 });
526 }
527 if !is_safe_segment(key) {
528 return Err(StateError::UnsafeSegment {
529 which: "key",
530 value: key.to_string(),
531 });
532 }
533
534 let target = self.root.join(namespace).join(format!("{key}.json"));
536
537 if !target.exists() {
539 return Err(StateError::KeyNotFound {
540 namespace: namespace.to_string(),
541 key: key.to_string(),
542 });
543 }
544
545 let lock = self
547 .ns_lock(&target)
548 .map_err(|s| StateError::ShapeInvalid { reason: s })?;
549 let _guard = lock.lock().map_err(|_| StateError::ShapeInvalid {
550 reason: "lock poisoned".to_string(),
551 })?;
552
553 let bak_path = target.with_extension("json.bak");
555 fs::copy(&target, &bak_path).map_err(StateError::IoBackup)?;
556
557 let content = fs::read_to_string(&target).map_err(StateError::IoRead)?;
559 let mut value: serde_json::Value = serde_json::from_str(&content)?;
560
561 let data = value
563 .get_mut("data")
564 .ok_or_else(|| StateError::ShapeInvalid {
565 reason: "missing 'data' top-level field".to_string(),
566 })?;
567 let data_obj = data
568 .as_object_mut()
569 .ok_or_else(|| StateError::ShapeInvalid {
570 reason: "'data' top-level field must be an object".to_string(),
571 })?;
572
573 let mut steps_removed = 0usize;
575 if !steps.is_empty() {
576 if let Some(cs) = data_obj.get_mut("completed_steps") {
577 if let Some(arr) = cs.as_array_mut() {
578 let before = arr.len();
579 arr.retain(|v| {
580 if let Some(s) = v.as_str() {
581 !steps.iter().any(|step| step == s)
582 } else {
583 true
584 }
585 });
586 steps_removed = before - arr.len();
587 } else {
588 return Err(StateError::ShapeInvalid {
589 reason: "data.completed_steps must be an array".to_string(),
590 });
591 }
592 }
593 }
595
596 let mut fields_removed = 0usize;
598 for field in fields {
599 if data_obj.remove(field.as_str()).is_some() {
600 fields_removed += 1;
601 }
602 }
603
604 let tmp = target.with_extension("json.tmp");
606 let serialized = serde_json::to_string_pretty(&value)?;
607 fs::write(&tmp, &serialized).map_err(StateError::IoWrite)?;
608
609 fs::rename(&tmp, &target).map_err(StateError::IoWrite)?;
611
612 Ok(ResetReport {
613 backup_path: bak_path,
614 steps_removed,
615 fields_removed,
616 })
617 }
618
619 fn load(&self, ns: &str) -> Result<HashMap<String, Value>, String> {
620 let path = self.state_path(ns)?;
621 if !path.exists() {
622 return Ok(HashMap::new());
623 }
624 let content =
625 fs::read_to_string(&path).map_err(|e| format!("Failed to read state '{ns}': {e}"))?;
626 serde_json::from_str(&content).map_err(|e| format!("Failed to parse state '{ns}': {e}"))
627 }
628
629 fn save(&self, ns: &str, data: &HashMap<String, Value>) -> Result<(), String> {
630 let path = self.state_path(ns)?;
631 let tmp = path.with_extension("json.tmp");
632 let content = serde_json::to_string_pretty(data)
633 .map_err(|e| format!("Failed to serialize state: {e}"))?;
634 fs::write(&tmp, &content).map_err(|e| format!("Failed to write state tmp: {e}"))?;
635 fs::rename(&tmp, &path).map_err(|e| format!("Failed to rename state file: {e}"))?;
636 Ok(())
637 }
638}
639
640impl StateStore for JsonFileStore {
641 fn get(&self, ns: &str, key: &str) -> Result<Option<Value>, String> {
642 if let Some(dpath) = self.dispatch_path(key)? {
647 let lock = self.ns_lock(&dpath)?;
648 let _guard = lock
649 .lock()
650 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
651 if let Some(v) = self.load_dispatched(&dpath)? {
652 return Ok(Some(v));
653 }
654 }
657 let path = self.state_path(ns)?;
658 let lock = self.ns_lock(&path)?;
659 let _guard = lock
660 .lock()
661 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
662 let state = self.load(ns)?;
663 Ok(state.get(key).cloned())
664 }
665
666 fn set(&self, ns: &str, key: &str, value: Value) -> Result<(), String> {
667 if let Some(dpath) = self.dispatch_path(key)? {
668 let lock = self.ns_lock(&dpath)?;
669 let _guard = lock
670 .lock()
671 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
672 return self.save_dispatched(&dpath, &value);
673 }
674 let path = self.state_path(ns)?;
675 let lock = self.ns_lock(&path)?;
676 let _guard = lock
677 .lock()
678 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
679 let mut state = self.load(ns)?;
680 state.insert(key.to_string(), value);
681 self.save(ns, &state)
682 }
683
684 fn delete(&self, ns: &str, key: &str) -> Result<bool, String> {
685 if let Some(dpath) = self.dispatch_path(key)? {
686 let lock = self.ns_lock(&dpath)?;
687 let _guard = lock
688 .lock()
689 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
690 if dpath.exists() {
691 fs::remove_file(&dpath).map_err(|e| {
692 format!(
693 "Failed to delete dispatched state '{}': {e}",
694 dpath.display()
695 )
696 })?;
697 return Ok(true);
698 }
699 }
702 let path = self.state_path(ns)?;
703 let lock = self.ns_lock(&path)?;
704 let _guard = lock
705 .lock()
706 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
707 let mut state = self.load(ns)?;
708 let existed = state.remove(key).is_some();
709 if existed {
710 self.save(ns, &state)?;
711 }
712 Ok(existed)
713 }
714
715 fn keys(&self, ns: &str) -> Result<Vec<String>, String> {
716 let path = self.state_path(ns)?;
717 let lock = self.ns_lock(&path)?;
718 let _guard = lock
719 .lock()
720 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
721 let state = self.load(ns)?;
722 Ok(state.keys().cloned().collect())
723 }
724
725 fn has(&self, ns: &str, key: &str) -> Result<bool, String> {
726 if let Some(dpath) = self.dispatch_path(key)? {
727 let lock = self.ns_lock(&dpath)?;
728 let _guard = lock
729 .lock()
730 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
731 if dpath.exists() {
732 return Ok(true);
733 }
734 }
736 let path = self.state_path(ns)?;
737 let lock = self.ns_lock(&path)?;
738 let _guard = lock
739 .lock()
740 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
741 let state = self.load(ns)?;
742 Ok(state.contains_key(key))
743 }
744
745 fn set_nx(&self, ns: &str, key: &str, value: Value) -> Result<bool, String> {
746 if let Some(dpath) = self.dispatch_path(key)? {
747 let lock = self.ns_lock(&dpath)?;
748 let _guard = lock
749 .lock()
750 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
751 if dpath.exists() {
752 return Ok(false);
753 }
754 let path = self.state_path(ns)?;
757 if path.exists() {
758 let state = self.load(ns)?;
759 if state.contains_key(key) {
760 return Ok(false);
761 }
762 }
763 self.save_dispatched(&dpath, &value)?;
764 return Ok(true);
765 }
766 let path = self.state_path(ns)?;
767 let lock = self.ns_lock(&path)?;
768 let _guard = lock
769 .lock()
770 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
771 let mut state = self.load(ns)?;
772 if state.contains_key(key) {
773 return Ok(false);
774 }
775 state.insert(key.to_string(), value);
776 self.save(ns, &state)?;
777 Ok(true)
778 }
779
780 fn incr(&self, ns: &str, key: &str, delta: f64, default: f64) -> Result<f64, String> {
781 if let Some(dpath) = self.dispatch_path(key)? {
782 let lock = self.ns_lock(&dpath)?;
783 let _guard = lock
784 .lock()
785 .map_err(|_| format!("state: dispatch lock poisoned for key '{key}'"))?;
786 let current = if let Some(v) = self.load_dispatched(&dpath)? {
787 v.as_f64()
788 .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?
789 } else {
790 let path = self.state_path(ns)?;
793 if path.exists() {
794 let state = self.load(ns)?;
795 match state.get(key) {
796 Some(v) => v
797 .as_f64()
798 .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
799 None => default,
800 }
801 } else {
802 default
803 }
804 };
805 let new_val = current + delta;
806 self.save_dispatched(&dpath, &serde_json::json!(new_val))?;
807 return Ok(new_val);
808 }
809 let path = self.state_path(ns)?;
810 let lock = self.ns_lock(&path)?;
811 let _guard = lock
812 .lock()
813 .map_err(|_| format!("state: lock poisoned for ns '{ns}'"))?;
814 let mut state = self.load(ns)?;
815 let current = match state.get(key) {
816 Some(v) => v
817 .as_f64()
818 .ok_or_else(|| format!("incr: value at '{key}' is not a number"))?,
819 None => default,
820 };
821 let new_val = current + delta;
822 state.insert(key.to_string(), serde_json::json!(new_val));
823 self.save(ns, &state)?;
824 Ok(new_val)
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831 use tempfile::TempDir;
832
833 fn new_store() -> (JsonFileStore, TempDir) {
836 let tmp = tempfile::tempdir().unwrap();
837 let store = JsonFileStore::new(tmp.path().to_path_buf());
838 (store, tmp)
839 }
840
841 #[test]
842 fn roundtrip() {
843 let (store, _tmp) = new_store();
844 let ns = "rt";
845
846 store.set(ns, "count", serde_json::json!(42)).unwrap();
847 store
848 .set(ns, "name", serde_json::json!("algocline"))
849 .unwrap();
850
851 assert_eq!(store.get(ns, "count").unwrap(), Some(serde_json::json!(42)));
852 assert_eq!(
853 store.get(ns, "name").unwrap(),
854 Some(serde_json::json!("algocline"))
855 );
856 assert_eq!(store.get(ns, "missing").unwrap(), None);
857
858 let k = store.keys(ns).unwrap();
859 assert!(k.contains(&"count".to_string()));
860 assert!(k.contains(&"name".to_string()));
861
862 assert!(store.delete(ns, "count").unwrap());
863 assert!(!store.delete(ns, "count").unwrap());
864 assert_eq!(store.get(ns, "count").unwrap(), None);
865 }
866
867 #[test]
868 fn invalid_namespace() {
869 let (store, _tmp) = new_store();
870 assert!(store.state_path("../evil").is_err());
871 assert!(store.state_path("foo/bar").is_err());
872 assert!(store.state_path("foo\\bar").is_err());
873 assert!(store.state_path("").is_err());
874 assert!(store.state_path("foo\0bar").is_err());
875 }
876
877 #[test]
878 fn get_nonexistent_namespace_returns_empty() {
879 let (store, _tmp) = new_store();
880 let result = store.get("ghost_ns", "any_key").unwrap();
881 assert_eq!(result, None);
882 }
883
884 #[test]
885 fn keys_nonexistent_namespace_returns_empty() {
886 let (store, _tmp) = new_store();
887 let result = store.keys("ghost_ns").unwrap();
888 assert!(result.is_empty());
889 }
890
891 #[test]
892 fn delete_nonexistent_key_returns_false() {
893 let (store, _tmp) = new_store();
894 assert!(!store.delete("delns", "nope").unwrap());
895 }
896
897 #[test]
898 fn set_overwrites_existing_value() {
899 let (store, _tmp) = new_store();
900 let ns = "ow";
901
902 store.set(ns, "k", serde_json::json!(1)).unwrap();
903 store.set(ns, "k", serde_json::json!(2)).unwrap();
904 assert_eq!(store.get(ns, "k").unwrap(), Some(serde_json::json!(2)));
905 }
906
907 #[test]
908 fn state_path_valid_namespaces() {
909 let (store, _tmp) = new_store();
910 assert!(store.state_path("default").is_ok());
911 assert!(store.state_path("my-app").is_ok());
912 assert!(store.state_path("test_123").is_ok());
913 }
914
915 #[test]
918 fn has_returns_existence() {
919 let (store, _tmp) = new_store();
920 let ns = "hasns";
921
922 assert!(!store.has(ns, "x").unwrap());
923 store.set(ns, "x", serde_json::json!(1)).unwrap();
924 assert!(store.has(ns, "x").unwrap());
925 }
926
927 #[test]
928 fn set_nx_only_sets_if_absent() {
929 let (store, _tmp) = new_store();
930 let ns = "snx";
931
932 assert!(store.set_nx(ns, "k", serde_json::json!("first")).unwrap());
933 assert!(!store.set_nx(ns, "k", serde_json::json!("second")).unwrap());
934 assert_eq!(
935 store.get(ns, "k").unwrap(),
936 Some(serde_json::json!("first")),
937 "set_nx should not overwrite"
938 );
939 }
940
941 #[test]
942 fn incr_initialises_and_increments() {
943 let (store, _tmp) = new_store();
944 let ns = "inc";
945
946 let v = store.incr(ns, "counter", 1.0, 0.0).unwrap();
948 assert!((v - 1.0).abs() < f64::EPSILON);
949
950 let v = store.incr(ns, "counter", 5.0, 0.0).unwrap();
952 assert!((v - 6.0).abs() < f64::EPSILON);
953
954 let v = store.incr(ns, "counter", -2.0, 0.0).unwrap();
956 assert!((v - 4.0).abs() < f64::EPSILON);
957 }
958
959 #[test]
960 fn incr_rejects_non_numeric() {
961 let (store, _tmp) = new_store();
962 let ns = "incerr";
963
964 store.set(ns, "s", serde_json::json!("hello")).unwrap();
965 let err = store.incr(ns, "s", 1.0, 0.0).unwrap_err();
966 assert!(err.contains("not a number"), "got: {err}");
967 }
968
969 #[test]
970 fn incr_custom_default() {
971 let (store, _tmp) = new_store();
972 let ns = "incdef";
973
974 let v = store.incr(ns, "score", 10.0, 100.0).unwrap();
975 assert!((v - 110.0).abs() < f64::EPSILON, "100 + 10 = 110");
976 }
977
978 #[test]
984 fn dispatch_writes_to_per_key_file_for_prefix_id_keys() {
985 let (store, tmp) = new_store();
986 store
987 .set(
988 "default",
989 "flow_orch:abc-123",
990 serde_json::json!({"step": 1}),
991 )
992 .unwrap();
993 let dispatched = tmp.path().join("flow_orch").join("abc-123.json");
994 assert!(
995 dispatched.exists(),
996 "dispatched file must exist at {}",
997 dispatched.display()
998 );
999 let legacy = tmp.path().join("default.json");
1001 assert!(
1002 !legacy.exists(),
1003 "legacy default.json must not be created for dispatched keys"
1004 );
1005 }
1006
1007 #[test]
1011 fn dispatch_read_falls_back_to_legacy_for_unmigrated_entries() {
1012 let (store, tmp) = new_store();
1013 store
1018 .set("default", "boot_marker", serde_json::json!(true))
1019 .unwrap();
1020 let legacy_path = tmp.path().join("default.json");
1021 let mut existing: HashMap<String, Value> =
1022 serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1023 existing.insert(
1024 "flow_legacy:xyz".to_string(),
1025 serde_json::json!({"old": "value"}),
1026 );
1027 std::fs::write(
1028 &legacy_path,
1029 serde_json::to_string_pretty(&existing).unwrap(),
1030 )
1031 .unwrap();
1032
1033 assert_eq!(
1035 store.get("default", "flow_legacy:xyz").unwrap(),
1036 Some(serde_json::json!({"old": "value"})),
1037 "must fall back to legacy default.json when dispatched file absent"
1038 );
1039
1040 store
1043 .set(
1044 "default",
1045 "flow_legacy:xyz",
1046 serde_json::json!({"new": "promoted"}),
1047 )
1048 .unwrap();
1049 assert!(
1050 tmp.path().join("flow_legacy").join("xyz.json").exists(),
1051 "set() must promote dispatched-shaped keys to per-key file"
1052 );
1053 assert_eq!(
1054 store.get("default", "flow_legacy:xyz").unwrap(),
1055 Some(serde_json::json!({"new": "promoted"})),
1056 "dispatched file must shadow legacy entry on subsequent reads"
1057 );
1058 }
1059
1060 #[test]
1063 fn dispatch_skips_keys_without_colon_or_with_unsafe_segments() {
1064 let (store, tmp) = new_store();
1065 store
1066 .set("default", "no_colon", serde_json::json!(1))
1067 .unwrap();
1068 store
1069 .set("default", "bad/prefix:id", serde_json::json!(2))
1070 .unwrap();
1071 store
1072 .set("default", "prefix:bad/id", serde_json::json!(3))
1073 .unwrap();
1074 store
1075 .set("default", "prefix:..", serde_json::json!(4))
1076 .unwrap();
1077 let legacy = tmp.path().join("default.json");
1079 let raw: HashMap<String, Value> =
1080 serde_json::from_str(&std::fs::read_to_string(&legacy).unwrap()).unwrap();
1081 assert_eq!(raw.get("no_colon"), Some(&serde_json::json!(1)));
1082 assert_eq!(raw.get("bad/prefix:id"), Some(&serde_json::json!(2)));
1083 assert_eq!(raw.get("prefix:bad/id"), Some(&serde_json::json!(3)));
1084 assert_eq!(raw.get("prefix:.."), Some(&serde_json::json!(4)));
1085 assert!(!tmp.path().join("bad").exists());
1087 assert!(!tmp.path().join("prefix").exists());
1088 }
1089
1090 #[test]
1092 fn dispatch_delete_removes_per_key_file() {
1093 let (store, tmp) = new_store();
1094 store.set("default", "p:q", serde_json::json!("v")).unwrap();
1095 let dispatched = tmp.path().join("p").join("q.json");
1096 assert!(
1097 dispatched.exists(),
1098 "dispatched file should exist before delete"
1099 );
1100 assert!(store.delete("default", "p:q").unwrap());
1101 assert!(
1102 !dispatched.exists(),
1103 "dispatched file should be removed after delete"
1104 );
1105 assert!(!store.delete("default", "p:q").unwrap());
1107 }
1108
1109 #[test]
1111 fn dispatch_has_reports_dispatched_file_existence() {
1112 let (store, _tmp) = new_store();
1113 assert!(!store.has("default", "p:q").unwrap());
1114 store.set("default", "p:q", serde_json::json!("v")).unwrap();
1115 assert!(store.has("default", "p:q").unwrap());
1116 }
1117
1118 #[test]
1121 fn dispatch_set_nx_blocks_when_legacy_or_dispatched_entry_exists() {
1122 let (store, tmp) = new_store();
1123 store
1125 .set("default", "boot", serde_json::json!(true))
1126 .unwrap();
1127 let legacy_path = tmp.path().join("default.json");
1128 let mut existing: HashMap<String, Value> =
1129 serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1130 existing.insert("p:q".to_string(), serde_json::json!("legacy_only"));
1131 std::fs::write(
1132 &legacy_path,
1133 serde_json::to_string_pretty(&existing).unwrap(),
1134 )
1135 .unwrap();
1136 assert!(!store
1138 .set_nx("default", "p:q", serde_json::json!("new"))
1139 .unwrap());
1140
1141 assert!(store
1145 .set_nx("default", "p:r", serde_json::json!("first"))
1146 .unwrap());
1147 assert!(tmp.path().join("p").join("r.json").exists());
1148 assert!(!store
1149 .set_nx("default", "p:r", serde_json::json!("second"))
1150 .unwrap());
1151 }
1152
1153 #[test]
1157 fn dispatch_incr_promotes_legacy_value_on_first_call() {
1158 let (store, tmp) = new_store();
1159 store.set("default", "seed", serde_json::json!(0)).unwrap();
1161 let legacy_path = tmp.path().join("default.json");
1162 let mut existing: HashMap<String, Value> =
1163 serde_json::from_str(&std::fs::read_to_string(&legacy_path).unwrap()).unwrap();
1164 existing.insert("counter:cnt".to_string(), serde_json::json!(7));
1165 std::fs::write(
1166 &legacy_path,
1167 serde_json::to_string_pretty(&existing).unwrap(),
1168 )
1169 .unwrap();
1170
1171 let result = store.incr("default", "counter:cnt", 3.0, 0.0).unwrap();
1174 assert_eq!(result, 10.0);
1175 let dispatched = tmp.path().join("counter").join("cnt.json");
1176 assert!(dispatched.exists(), "dispatched file must be created");
1177
1178 let result2 = store.incr("default", "counter:cnt", 2.0, 0.0).unwrap();
1180 assert_eq!(result2, 12.0);
1181 }
1182
1183 #[test]
1186 fn is_safe_segment_validates_path_safety() {
1187 assert!(is_safe_segment("flow_orch"));
1188 assert!(is_safe_segment("abc-123"));
1189 assert!(is_safe_segment("v1.2.3"));
1190 assert!(!is_safe_segment(""));
1191 assert!(!is_safe_segment("."));
1192 assert!(!is_safe_segment(".."));
1193 assert!(!is_safe_segment("a..b"));
1194 assert!(!is_safe_segment("a/b"));
1195 assert!(!is_safe_segment("a\\b"));
1196 assert!(!is_safe_segment("a b"));
1197 assert!(!is_safe_segment("a\0b"));
1198 }
1199
1200 mod dispatched_layout {
1203 use super::*;
1204
1205 fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1208 let dir = tmp.path().join(ns);
1209 fs::create_dir_all(&dir).expect("create ns dir");
1211 let path = dir.join(format!("{key}.json"));
1212 fs::write(
1213 path,
1214 serde_json::to_string_pretty(&value).expect("serialize"),
1215 )
1216 .expect("write seed file");
1217 }
1218
1219 #[test]
1221 fn list_returns_json_keys_only() {
1222 let (store, tmp) = new_store();
1223 seed(&tmp, "myns", "alpha", serde_json::json!(1));
1224 seed(&tmp, "myns", "beta", serde_json::json!(2));
1225 let ns_dir = tmp.path().join("myns");
1227 fs::write(ns_dir.join("alpha.json.bak"), b"backup").expect("write bak");
1228 fs::write(ns_dir.join("alpha.json.tmp"), b"tmp").expect("write tmp");
1229 fs::write(ns_dir.join("notes.txt"), b"text").expect("write txt");
1230
1231 let keys = store.list_dispatched("myns").unwrap();
1232 assert_eq!(
1233 keys,
1234 vec!["alpha", "beta"],
1235 "must be sorted, .bak/.tmp/.txt excluded"
1236 );
1237 }
1238
1239 #[test]
1242 fn list_returns_empty_for_absent_namespace() {
1243 let (store, _tmp) = new_store();
1244 let keys = store.list_dispatched("ghost").unwrap();
1245 assert!(keys.is_empty(), "absent namespace should return empty Vec");
1246 }
1247
1248 #[test]
1251 fn list_returns_empty_when_only_non_json_files_present() {
1252 let (store, tmp) = new_store();
1253 let ns_dir = tmp.path().join("empty_ns");
1254 fs::create_dir_all(&ns_dir).expect("create dir");
1256 fs::write(ns_dir.join("readme.txt"), b"hi").expect("write");
1257 let keys = store.list_dispatched("empty_ns").unwrap();
1258 assert!(keys.is_empty());
1259 }
1260
1261 #[test]
1264 fn show_returns_key_not_found_for_absent_namespace() {
1265 let (store, _tmp) = new_store();
1266 let err = store.show_dispatched("nodir", "anykey").unwrap_err();
1267 assert!(
1268 matches!(err, StateError::KeyNotFound { .. }),
1269 "expected KeyNotFound, got: {err}"
1270 );
1271 assert!(err.to_string().contains("not found"), "{err}");
1273 }
1274
1275 #[test]
1278 fn show_returns_key_not_found_for_absent_key() {
1279 let (store, tmp) = new_store();
1280 let ns_dir = tmp.path().join("myns2");
1282 fs::create_dir_all(&ns_dir).expect("create dir");
1284
1285 let err = store.show_dispatched("myns2", "missing").unwrap_err();
1286 assert!(
1287 matches!(err, StateError::KeyNotFound { .. }),
1288 "expected KeyNotFound, got: {err}"
1289 );
1290 }
1291
1292 #[test]
1294 fn show_returns_full_value_for_existing_key() {
1295 let (store, tmp) = new_store();
1296 let expected = serde_json::json!({"data": {"completed_steps": ["a", "b"], "x": 42}});
1297 seed(&tmp, "showns", "task1", expected.clone());
1298
1299 let result = store.show_dispatched("showns", "task1").unwrap();
1300 assert_eq!(result, expected);
1301 }
1302 }
1303
1304 mod reset_atomicity {
1305 use super::*;
1306
1307 fn seed(tmp: &TempDir, ns: &str, key: &str, value: serde_json::Value) {
1309 let dir = tmp.path().join(ns);
1310 fs::create_dir_all(&dir).expect("create ns dir");
1312 let path = dir.join(format!("{key}.json"));
1313 fs::write(
1314 path,
1315 serde_json::to_string_pretty(&value).expect("serialize"),
1316 )
1317 .expect("write seed");
1318 }
1319
1320 #[test]
1323 fn reset_removes_steps_and_fields_and_creates_backup() {
1324 let (store, tmp) = new_store();
1325 let original = serde_json::json!({
1326 "data": {
1327 "completed_steps": ["a", "b", "c"],
1328 "x": 1,
1329 "y": "hello"
1330 }
1331 });
1332 seed(&tmp, "testns", "task1", original.clone());
1333
1334 let report = store
1335 .reset_dispatched_with_backup(
1336 "testns",
1337 "task1",
1338 &["b".to_string()],
1339 &["x".to_string()],
1340 )
1341 .unwrap();
1342
1343 let bak_path = tmp.path().join("testns").join("task1.json.bak");
1345 assert!(
1346 bak_path.exists(),
1347 ".bak file must exist at {}",
1348 bak_path.display()
1349 );
1350 assert_eq!(report.backup_path, bak_path);
1351 let bak_content: serde_json::Value =
1352 serde_json::from_str(&fs::read_to_string(&bak_path).expect("read bak"))
1353 .expect("parse bak");
1354 assert_eq!(bak_content, original, ".bak must contain original content");
1355
1356 let live_path = tmp.path().join("testns").join("task1.json");
1358 let live_content: serde_json::Value =
1359 serde_json::from_str(&fs::read_to_string(&live_path).expect("read live"))
1360 .expect("parse live");
1361 let expected = serde_json::json!({
1362 "data": {
1363 "completed_steps": ["a", "c"],
1364 "y": "hello"
1365 }
1366 });
1367 assert_eq!(live_content, expected, "live file must be mutated");
1368
1369 assert_eq!(report.steps_removed, 1, "one step removed");
1371 assert_eq!(report.fields_removed, 1, "one field removed");
1372 }
1373
1374 #[test]
1376 fn reset_removes_multiple_steps_and_fields() {
1377 let (store, tmp) = new_store();
1378 let original = serde_json::json!({
1379 "data": {
1380 "completed_steps": ["s1", "s2", "s3", "s4"],
1381 "repo_readiness": "NOT_READY",
1382 "repo_readiness_report": "details here",
1383 "plan_gate_retries": 2
1384 }
1385 });
1386 seed(&tmp, "orchns", "task-abc", original.clone());
1387
1388 let report = store
1389 .reset_dispatched_with_backup(
1390 "orchns",
1391 "task-abc",
1392 &["s2".to_string(), "s3".to_string()],
1393 &[
1394 "repo_readiness".to_string(),
1395 "repo_readiness_report".to_string(),
1396 ],
1397 )
1398 .unwrap();
1399
1400 let live_path = tmp.path().join("orchns").join("task-abc.json");
1401 let live: serde_json::Value =
1402 serde_json::from_str(&fs::read_to_string(&live_path).expect("read"))
1403 .expect("parse");
1404 assert_eq!(
1405 live["data"]["completed_steps"],
1406 serde_json::json!(["s1", "s4"])
1407 );
1408 assert!(live["data"].get("repo_readiness").is_none());
1409 assert!(live["data"].get("repo_readiness_report").is_none());
1410 assert_eq!(live["data"]["plan_gate_retries"], 2);
1411
1412 assert_eq!(report.steps_removed, 2);
1413 assert_eq!(report.fields_removed, 2);
1414 }
1415
1416 #[test]
1418 fn reset_returns_key_not_found_for_absent_file() {
1419 let (store, _tmp) = new_store();
1420 let err = store
1421 .reset_dispatched_with_backup("ns", "missing", &[], &[])
1422 .unwrap_err();
1423 assert!(
1424 matches!(err, StateError::KeyNotFound { .. }),
1425 "expected KeyNotFound, got: {err}"
1426 );
1427 }
1428
1429 #[test]
1431 fn reset_returns_shape_invalid_when_data_absent() {
1432 let (store, tmp) = new_store();
1433 let bad = serde_json::json!({"identity": {"task_id": "t1"}});
1435 let dir = tmp.path().join("badns");
1436 fs::create_dir_all(&dir).expect("create dir");
1438 fs::write(
1439 dir.join("k.json"),
1440 serde_json::to_string_pretty(&bad).expect("ser"),
1441 )
1442 .expect("write");
1443
1444 let err = store
1445 .reset_dispatched_with_backup("badns", "k", &["s".to_string()], &[])
1446 .unwrap_err();
1447 assert!(
1448 matches!(err, StateError::ShapeInvalid { .. }),
1449 "expected ShapeInvalid, got: {err}"
1450 );
1451 assert!(err.to_string().contains("data"), "{err}");
1452 }
1453
1454 #[test]
1457 fn reset_returns_shape_invalid_when_completed_steps_not_array() {
1458 let (store, tmp) = new_store();
1459 let bad = serde_json::json!({"data": {"completed_steps": {"step": "a"}}});
1461 let dir = tmp.path().join("badns2");
1462 fs::create_dir_all(&dir).expect("create dir");
1464 fs::write(
1465 dir.join("k.json"),
1466 serde_json::to_string_pretty(&bad).expect("ser"),
1467 )
1468 .expect("write");
1469
1470 let err = store
1471 .reset_dispatched_with_backup("badns2", "k", &["a".to_string()], &[])
1472 .unwrap_err();
1473 assert!(
1474 matches!(err, StateError::ShapeInvalid { .. }),
1475 "expected ShapeInvalid, got: {err}"
1476 );
1477 assert!(
1478 err.to_string().contains("completed_steps"),
1479 "message should mention completed_steps: {err}"
1480 );
1481 }
1482 }
1483
1484 mod path_traversal {
1485 use super::*;
1486
1487 #[test]
1489 fn list_rejects_unsafe_namespace() {
1490 let (store, _tmp) = new_store();
1491 let err = store.list_dispatched("../evil").unwrap_err();
1492 assert!(
1493 matches!(
1494 err,
1495 StateError::UnsafeSegment {
1496 which: "namespace",
1497 ..
1498 }
1499 ),
1500 "expected UnsafeSegment{{namespace}}, got: {err}"
1501 );
1502 }
1503
1504 #[test]
1506 fn show_rejects_unsafe_key() {
1507 let (store, _tmp) = new_store();
1508 let err = store.show_dispatched("ns", "foo/bar").unwrap_err();
1509 assert!(
1510 matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1511 "expected UnsafeSegment{{key}}, got: {err}"
1512 );
1513 }
1514
1515 #[test]
1517 fn reset_rejects_empty_namespace() {
1518 let (store, _tmp) = new_store();
1519 let err = store
1520 .reset_dispatched_with_backup("", "key", &[], &[])
1521 .unwrap_err();
1522 assert!(
1523 matches!(
1524 err,
1525 StateError::UnsafeSegment {
1526 which: "namespace",
1527 ..
1528 }
1529 ),
1530 "expected UnsafeSegment{{namespace}}, got: {err}"
1531 );
1532 }
1533
1534 #[test]
1536 fn reset_rejects_dotdot_key() {
1537 let (store, _tmp) = new_store();
1538 let err = store
1539 .reset_dispatched_with_backup("ns", "..", &[], &[])
1540 .unwrap_err();
1541 assert!(
1542 matches!(err, StateError::UnsafeSegment { which: "key", .. }),
1543 "expected UnsafeSegment{{key}}, got: {err}"
1544 );
1545 }
1546 }
1547}
1548
1549#[cfg(test)]
1550mod proptests {
1551 use super::*;
1552 use proptest::prelude::*;
1553
1554 fn new_store() -> (JsonFileStore, tempfile::TempDir) {
1555 let tmp = tempfile::tempdir().unwrap();
1556 let store = JsonFileStore::new(tmp.path().to_path_buf());
1557 (store, tmp)
1558 }
1559
1560 proptest! {
1561 #[test]
1563 fn roundtrip_arbitrary_values(
1564 key in "[a-z]{1,20}",
1565 val in any::<i64>(),
1566 ) {
1567 let (store, _tmp) = new_store();
1568 let ns = "rt";
1569 let json_val = serde_json::json!(val);
1570 store.set(ns, &key, json_val.clone()).unwrap();
1571 let got = store.get(ns, &key).unwrap();
1572 prop_assert_eq!(got, Some(json_val));
1573 let _ = store.delete(ns, &key);
1574 }
1575
1576 #[test]
1578 fn traversal_always_rejected(
1579 prefix in "[a-z]{0,5}",
1580 suffix in "[a-z]{0,5}",
1581 ) {
1582 let (store, _tmp) = new_store();
1583 let evil = format!("{prefix}/../{suffix}");
1584 prop_assert!(store.state_path(&evil).is_err());
1585 }
1586
1587 #[test]
1589 fn nul_byte_always_rejected(
1590 prefix in "[a-z]{0,10}",
1591 suffix in "[a-z]{0,10}",
1592 ) {
1593 let (store, _tmp) = new_store();
1594 let evil = format!("{prefix}\0{suffix}");
1595 prop_assert!(store.state_path(&evil).is_err());
1596 }
1597 }
1598}