1use chrono::{DateTime, Duration, Utc};
22use parking_lot::Mutex;
23use serde::{Deserialize, Serialize};
24use serde_json::Value;
25use std::collections::HashMap;
26use std::fs::{File, OpenOptions};
27use std::io::{BufRead, BufReader, BufWriter, Write};
28use std::path::{Path, PathBuf};
29
30#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub struct StateTransition {
38 pub key: String,
39 pub old_value: Option<Value>,
40 pub new_value: Option<Value>,
41 pub action_id: String,
42 pub timestamp: DateTime<Utc>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub ttl_secs: Option<u64>,
45}
46
47pub struct StateStore {
54 state: Mutex<HashMap<String, Value>>,
55 transitions: Mutex<Vec<StateTransition>>,
56 journal: Mutex<Option<Journal>>,
61}
62
63struct Journal {
64 path: PathBuf,
65 writer: BufWriter<File>,
66}
67
68impl StateStore {
69 pub fn new() -> Self {
70 Self {
71 state: Mutex::new(HashMap::new()),
72 transitions: Mutex::new(Vec::new()),
73 journal: Mutex::new(None),
74 }
75 }
76
77 pub fn durable(path: impl Into<PathBuf>) -> std::io::Result<Self> {
88 let path = path.into();
89 if let Some(parent) = path.parent() {
90 if !parent.as_os_str().is_empty() {
91 std::fs::create_dir_all(parent)?;
92 }
93 }
94 let store = Self::new();
95 store.replay_journal(&path)?;
96 let file = OpenOptions::new().create(true).append(true).open(&path)?;
97 *store.journal.lock() = Some(Journal {
98 path,
99 writer: BufWriter::new(file),
100 });
101 Ok(store)
102 }
103
104 fn replay_journal(&self, path: &Path) -> std::io::Result<()> {
105 if !path.exists() {
106 return Ok(());
107 }
108 let file = File::open(path)?;
109 let reader = BufReader::new(file);
110 let now = Utc::now();
111 let mut state = self.state.lock();
112 let mut transitions = self.transitions.lock();
113 for line in reader.lines() {
114 let line = match line {
115 Ok(l) if l.trim().is_empty() => continue,
116 Ok(l) => l,
117 Err(_) => continue,
118 };
119 let Ok(t) = serde_json::from_str::<StateTransition>(&line) else {
120 tracing::warn!(
122 journal = %path.display(),
123 "skipping malformed StateStore journal line"
124 );
125 continue;
126 };
127 if let (Some(ttl), Some(value)) = (t.ttl_secs, &t.new_value) {
131 if now.signed_duration_since(t.timestamp) > Duration::seconds(ttl as i64) {
132 state.remove(&t.key);
133 } else {
134 state.insert(t.key.clone(), value.clone());
135 }
136 } else if let Some(value) = &t.new_value {
137 state.insert(t.key.clone(), value.clone());
138 } else {
139 state.remove(&t.key);
140 }
141 transitions.push(t);
142 }
143 Ok(())
144 }
145
146 fn append_journal(&self, transition: &StateTransition) {
147 let mut journal = self.journal.lock();
148 let Some(journal) = journal.as_mut() else {
149 return;
150 };
151 let Ok(json) = serde_json::to_string(transition) else {
155 return;
156 };
157 if let Err(e) = writeln!(journal.writer, "{json}") {
158 tracing::warn!(
159 journal = %journal.path.display(),
160 error = %e,
161 "StateStore journal append failed"
162 );
163 return;
164 }
165 let _ = journal.writer.flush();
166 }
167
168 pub fn sync(&self) -> std::io::Result<()> {
171 let mut journal = self.journal.lock();
172 let Some(journal) = journal.as_mut() else {
173 return Ok(());
174 };
175 journal.writer.flush()?;
176 journal.writer.get_ref().sync_all()
177 }
178
179 pub fn reap_expired(&self, now: DateTime<Utc>) -> std::io::Result<Vec<String>> {
195 let mut state = self.state.lock();
196 let mut transitions = self.transitions.lock();
197 let mut latest_by_key: HashMap<&str, &StateTransition> = HashMap::new();
201 for t in transitions.iter() {
202 latest_by_key.insert(t.key.as_str(), t);
203 }
204 let expired: Vec<String> = latest_by_key
205 .iter()
206 .filter_map(|(_k, t)| {
207 let ttl = t.ttl_secs?;
208 if t.new_value.is_none() {
209 return None;
210 }
211 let age = now.signed_duration_since(t.timestamp);
212 (age > Duration::seconds(ttl as i64)).then(|| t.key.clone())
213 })
214 .collect();
215 let mut reaped = Vec::new();
216 for key in expired {
217 if state.remove(&key).is_some() {
218 reaped.push(key.clone());
219 transitions.push(StateTransition {
220 key,
221 old_value: None,
222 new_value: None,
223 action_id: "reap".to_string(),
224 timestamp: now,
225 ttl_secs: None,
226 });
227 }
228 }
229 drop(state);
230 drop(transitions);
231 if !reaped.is_empty() {
232 self.compact_journal()?;
233 }
234 Ok(reaped)
235 }
236
237 pub(crate) fn compact_journal(&self) -> std::io::Result<()> {
249 let mut journal = self.journal.lock();
250 let Some(j) = journal.as_mut() else {
251 return Ok(());
252 };
253 let state = self.state.lock().clone();
254 let tmp_path = j.path.with_extension("jsonl.tmp");
255 {
256 let tmp_file = File::create(&tmp_path)?;
257 let mut writer = BufWriter::new(tmp_file);
258 for (key, value) in &state {
259 let t = StateTransition {
260 key: key.clone(),
261 old_value: None,
262 new_value: Some(value.clone()),
263 action_id: "compact".to_string(),
264 timestamp: Utc::now(),
265 ttl_secs: None,
266 };
267 let line = serde_json::to_string(&t)?;
268 writeln!(writer, "{line}")?;
269 }
270 writer.flush()?;
271 writer.get_ref().sync_all()?;
272 }
273 std::fs::rename(&tmp_path, &j.path)?;
274 let file = OpenOptions::new().create(true).append(true).open(&j.path)?;
275 j.writer = BufWriter::new(file);
276 Ok(())
277 }
278
279 pub fn get(&self, key: &str) -> Option<Value> {
280 self.state.lock().get(key).cloned()
281 }
282
283 pub fn get_or(&self, key: &str, default: Value) -> Value {
284 self.state.lock().get(key).cloned().unwrap_or(default)
285 }
286
287 pub fn exists(&self, key: &str) -> bool {
288 self.state.lock().contains_key(key)
289 }
290
291 pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
292 self.set_inner(key, value, action_id, None)
293 }
294
295 pub fn set_with_ttl(
307 &self,
308 key: &str,
309 value: Value,
310 action_id: &str,
311 ttl_secs: u64,
312 ) -> StateTransition {
313 self.set_inner(key, value, action_id, Some(ttl_secs))
314 }
315
316 fn set_inner(
317 &self,
318 key: &str,
319 value: Value,
320 action_id: &str,
321 ttl_secs: Option<u64>,
322 ) -> StateTransition {
323 let mut state = self.state.lock();
324 let old = state.get(key).cloned();
325 state.insert(key.to_string(), value.clone());
326
327 let t = StateTransition {
328 key: key.to_string(),
329 old_value: old,
330 new_value: Some(value),
331 action_id: action_id.to_string(),
332 timestamp: Utc::now(),
333 ttl_secs,
334 };
335
336 self.transitions.lock().push(t.clone());
337 self.append_journal(&t);
338 t
339 }
340
341 pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
342 let mut state = self.state.lock();
343 let old = state.remove(key)?;
344
345 let t = StateTransition {
346 key: key.to_string(),
347 old_value: Some(old),
348 new_value: None,
349 action_id: action_id.to_string(),
350 timestamp: Utc::now(),
351 ttl_secs: None,
352 };
353
354 self.transitions.lock().push(t.clone());
355 self.append_journal(&t);
356 Some(t)
357 }
358
359 pub fn snapshot(&self) -> HashMap<String, Value> {
361 self.state.lock().clone()
362 }
363
364 pub fn restore(&self, snapshot: HashMap<String, Value>, transition_count: usize) {
366 *self.state.lock() = snapshot;
367 self.transitions.lock().truncate(transition_count);
368 }
369
370 pub fn transition_count(&self) -> usize {
371 self.transitions.lock().len()
372 }
373
374 pub fn transitions(&self) -> Vec<StateTransition> {
375 self.transitions.lock().clone()
376 }
377
378 pub fn transitions_since(&self, index: usize) -> Vec<StateTransition> {
379 let transitions = self.transitions.lock();
380 let start = index.min(transitions.len());
381 transitions[start..].to_vec()
382 }
383
384 pub fn keys(&self) -> Vec<String> {
385 self.state.lock().keys().cloned().collect()
386 }
387
388 pub fn replace_all(&self, snapshot: HashMap<String, Value>) {
393 *self.state.lock() = snapshot;
394 self.transitions.lock().clear();
395 }
396
397 pub fn scoped<'a>(&'a self, tenant: Option<&'a str>) -> ScopedStateView<'a> {
410 ScopedStateView {
411 store: self,
412 tenant,
413 }
414 }
415}
416
417pub struct ScopedStateView<'a> {
448 store: &'a StateStore,
449 tenant: Option<&'a str>,
450}
451
452impl<'a> ScopedStateView<'a> {
453 fn full_key(&self, key: &str) -> String {
454 match self.tenant {
455 Some(t) if !t.is_empty() => format!("tenant:{t}:{key}"),
456 _ => key.to_string(),
457 }
458 }
459
460 fn strip_prefix<'k>(&self, full: &'k str) -> Option<&'k str> {
461 match self.tenant {
462 Some(t) if !t.is_empty() => {
463 let prefix = format!("tenant:{t}:");
464 full.strip_prefix(&prefix)
465 }
466 _ => Some(full),
467 }
468 }
469
470 pub fn get(&self, key: &str) -> Option<Value> {
471 self.store.get(&self.full_key(key))
472 }
473
474 pub fn get_or(&self, key: &str, default: Value) -> Value {
475 self.store.get_or(&self.full_key(key), default)
476 }
477
478 pub fn exists(&self, key: &str) -> bool {
479 self.store.exists(&self.full_key(key))
480 }
481
482 pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
483 self.store.set(&self.full_key(key), value, action_id)
484 }
485
486 pub fn set_with_ttl(
487 &self,
488 key: &str,
489 value: Value,
490 action_id: &str,
491 ttl_secs: u64,
492 ) -> StateTransition {
493 self.store
494 .set_with_ttl(&self.full_key(key), value, action_id, ttl_secs)
495 }
496
497 pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
498 self.store.delete(&self.full_key(key), action_id)
499 }
500
501 pub fn keys(&self) -> Vec<String> {
507 self.store
508 .keys()
509 .into_iter()
510 .filter_map(|k| {
511 if self.tenant.map(|t| !t.is_empty()).unwrap_or(false) {
512 self.strip_prefix(&k).map(str::to_string)
513 } else if k.starts_with("tenant:") {
514 None
515 } else {
516 Some(k)
517 }
518 })
519 .collect()
520 }
521}
522
523impl Default for StateStore {
524 fn default() -> Self {
525 Self::new()
526 }
527}
528
529impl car_ir::precondition::StateView for StateStore {
530 fn get_value(&self, key: &str) -> Option<Value> {
531 self.get(key)
532 }
533 fn key_exists(&self, key: &str) -> bool {
534 self.exists(key)
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use serde_json::json;
542
543 #[test]
544 fn set_and_get() {
545 let store = StateStore::new();
546 store.set("x", Value::from(42), "test");
547 assert_eq!(store.get("x"), Some(Value::from(42)));
548 }
549
550 #[test]
551 fn exists() {
552 let store = StateStore::new();
553 assert!(!store.exists("x"));
554 store.set("x", Value::from(1), "test");
555 assert!(store.exists("x"));
556 }
557
558 #[test]
559 fn delete() {
560 let store = StateStore::new();
561 store.set("x", Value::from(1), "test");
562 let t = store.delete("x", "test");
563 assert!(t.is_some());
564 assert!(!store.exists("x"));
565 }
566
567 #[test]
568 fn delete_nonexistent() {
569 let store = StateStore::new();
570 assert!(store.delete("x", "test").is_none());
571 }
572
573 #[test]
574 fn snapshot_and_restore() {
575 let store = StateStore::new();
576 store.set("x", Value::from(1), "a");
577 let snap = store.snapshot();
578 let tc = store.transition_count();
579
580 store.set("y", Value::from(2), "b");
581 assert!(store.exists("y"));
582
583 store.restore(snap, tc);
584 assert!(store.exists("x"));
585 assert!(!store.exists("y"));
586 assert_eq!(store.transition_count(), 1);
587 }
588
589 #[test]
590 fn transitions_logged() {
591 let store = StateStore::new();
592 store.set("a", Value::from(1), "act1");
593 store.set("b", Value::from(2), "act2");
594
595 let transitions = store.transitions();
596 assert_eq!(transitions.len(), 2);
597 assert_eq!(transitions[0].key, "a");
598 assert_eq!(transitions[1].key, "b");
599 }
600
601 #[test]
602 fn transitions_since() {
603 let store = StateStore::new();
604 store.set("a", Value::from(1), "act1");
605 let idx = store.transition_count();
606 store.set("b", Value::from(2), "act2");
607
608 let since = store.transitions_since(idx);
609 assert_eq!(since.len(), 1);
610 assert_eq!(since[0].key, "b");
611 }
612
613 #[test]
614 fn transition_records_old_value() {
615 let store = StateStore::new();
616 store.set("x", Value::from(1), "first");
617 store.set("x", Value::from(2), "second");
618
619 let transitions = store.transitions();
620 assert_eq!(transitions[1].old_value, Some(Value::from(1)));
621 assert_eq!(transitions[1].new_value, Some(Value::from(2)));
622 }
623
624 #[test]
625 fn keys() {
626 let store = StateStore::new();
627 store.set("a", Value::from(1), "t");
628 store.set("b", Value::from(2), "t");
629 let mut keys = store.keys();
630 keys.sort();
631 assert_eq!(keys, vec!["a", "b"]);
632 }
633
634 #[test]
635 fn transitions_since_after_restore_does_not_panic() {
636 let store = StateStore::new();
637 store.set("a", serde_json::json!(1), "test");
638 store.set("b", serde_json::json!(2), "test");
639 let count_before = store.transition_count(); store.restore(HashMap::new(), 0);
643
644 let result = store.transitions_since(count_before);
646 assert!(result.is_empty());
647 }
648
649 #[test]
650 fn transitions_since_normal_usage() {
651 let store = StateStore::new();
652 store.set("a", serde_json::json!(1), "test");
653 let mark = store.transition_count();
654 store.set("b", serde_json::json!(2), "test");
655 let since = store.transitions_since(mark);
656 assert_eq!(since.len(), 1);
657 assert_eq!(since[0].key, "b");
658 }
659
660 #[test]
661 fn replace_all_swaps_state_without_transitions() {
662 let store = StateStore::new();
663 store.set("old_key", serde_json::json!("old"), "setup");
664
665 let mut new_state = HashMap::new();
666 new_state.insert("new_key".to_string(), serde_json::json!("new"));
667 store.replace_all(new_state);
668
669 assert_eq!(store.get("new_key"), Some(serde_json::json!("new")));
670 assert_eq!(store.get("old_key"), None);
671 assert_eq!(store.transition_count(), 0);
673 }
674
675 #[test]
676 fn durable_store_survives_reopen() {
677 let dir = tempfile::tempdir().unwrap();
678 let path = dir.path().join("state.jsonl");
679 {
680 let store = StateStore::durable(&path).unwrap();
681 store.set("agent", serde_json::json!("planner"), "boot");
682 store.set("turns", serde_json::json!(42), "tick");
683 store.sync().unwrap();
684 }
685 let store = StateStore::durable(&path).unwrap();
686 assert_eq!(store.get("agent"), Some(serde_json::json!("planner")));
687 assert_eq!(store.get("turns"), Some(serde_json::json!(42)));
688 }
689
690 #[test]
691 fn durable_store_replays_deletes() {
692 let dir = tempfile::tempdir().unwrap();
693 let path = dir.path().join("state.jsonl");
694 {
695 let store = StateStore::durable(&path).unwrap();
696 store.set("transient", serde_json::json!("x"), "boot");
697 store.delete("transient", "rm");
698 store.sync().unwrap();
699 }
700 let store = StateStore::durable(&path).unwrap();
701 assert!(!store.exists("transient"));
702 }
703
704 #[test]
705 fn ttl_reap_drops_expired_and_keeps_fresh() {
706 let store = StateStore::new();
707 store.set_with_ttl("short", serde_json::json!(1), "set", 0);
708 store.set_with_ttl("long", serde_json::json!(2), "set", 3600);
709 store.set("forever", serde_json::json!(3), "set");
710 let reaped = store
712 .reap_expired(Utc::now() + Duration::seconds(10))
713 .unwrap();
714 assert_eq!(reaped, vec!["short".to_string()]);
715 assert!(!store.exists("short"));
716 assert_eq!(store.get("long"), Some(serde_json::json!(2)));
717 assert_eq!(store.get("forever"), Some(serde_json::json!(3)));
718 }
719
720 #[test]
721 fn durable_ttl_compacts_journal() {
722 let dir = tempfile::tempdir().unwrap();
723 let path = dir.path().join("state.jsonl");
724 {
725 let store = StateStore::durable(&path).unwrap();
726 for i in 0..50 {
727 store.set_with_ttl(&format!("k{i}"), serde_json::json!(i), "set", 0);
728 }
729 store.set("survivor", serde_json::json!("kept"), "set");
730 store.sync().unwrap();
731 let pre = std::fs::metadata(&path).unwrap().len();
732 let reaped = store
734 .reap_expired(Utc::now() + Duration::seconds(1))
735 .unwrap();
736 assert_eq!(reaped.len(), 50);
737 store.sync().unwrap();
738 let post = std::fs::metadata(&path).unwrap().len();
739 assert!(
742 post < pre,
743 "post={post} pre={pre} — compaction did not shrink"
744 );
745 }
746 let store = StateStore::durable(&path).unwrap();
748 assert!(!store.exists("k0"));
749 assert!(!store.exists("k49"));
750 assert_eq!(store.get("survivor"), Some(serde_json::json!("kept")));
751 }
752
753 #[test]
754 fn ttl_then_rewrite_without_ttl_does_not_reap() {
755 let store = StateStore::new();
756 store.set_with_ttl("k", serde_json::json!("a"), "first", 0);
757 store.set("k", serde_json::json!("b"), "second"); let reaped = store
759 .reap_expired(Utc::now() + Duration::seconds(10))
760 .unwrap();
761 assert!(reaped.is_empty());
762 assert_eq!(store.get("k"), Some(serde_json::json!("b")));
763 }
764
765 #[test]
766 fn malformed_journal_line_is_skipped_not_fatal() {
767 let dir = tempfile::tempdir().unwrap();
768 let path = dir.path().join("state.jsonl");
769 {
771 std::fs::write(
772 &path,
773 "{\"key\":\"a\",\"old_value\":null,\"new_value\":1,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n\
774 not-json\n\
775 {\"key\":\"b\",\"old_value\":null,\"new_value\":2,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n",
776 )
777 .unwrap();
778 }
779 let store = StateStore::durable(&path).unwrap();
780 assert_eq!(store.get("a"), Some(serde_json::json!(1)));
781 assert_eq!(store.get("b"), Some(serde_json::json!(2)));
782 }
783
784 #[test]
787 fn scoped_view_writes_isolate_between_tenants() {
788 let store = StateStore::new();
789 store.scoped(Some("acme")).set("config", json!("A"), "act");
790 store
791 .scoped(Some("globex"))
792 .set("config", json!("G"), "act");
793
794 assert_eq!(store.scoped(Some("acme")).get("config"), Some(json!("A")));
796 assert_eq!(store.scoped(Some("globex")).get("config"), Some(json!("G")));
797 }
798
799 #[test]
800 fn scoped_view_isolates_existence_check() {
801 let store = StateStore::new();
802 store.scoped(Some("acme")).set("k", json!(1), "act");
803 assert!(store.scoped(Some("acme")).exists("k"));
804 assert!(!store.scoped(Some("globex")).exists("k"));
805 }
806
807 #[test]
808 fn scoped_view_keys_filters_to_tenant() {
809 let store = StateStore::new();
810 store.scoped(Some("acme")).set("a", json!(1), "act");
811 store.scoped(Some("acme")).set("b", json!(2), "act");
812 store.scoped(Some("globex")).set("g", json!(9), "act");
813 store.set("unscoped", json!(0), "act");
814
815 let mut acme_keys = store.scoped(Some("acme")).keys();
816 acme_keys.sort();
817 assert_eq!(acme_keys, vec!["a", "b"]);
818
819 let globex_keys = store.scoped(Some("globex")).keys();
820 assert_eq!(globex_keys, vec!["g"]);
821 }
822
823 #[test]
824 fn unscoped_view_skips_tenant_prefixed_keys() {
825 let store = StateStore::new();
831 store.set("legacy", json!("ok"), "act");
832 store.scoped(Some("acme")).set("hidden", json!(42), "act");
833
834 let unscoped = store.scoped(None).keys();
835 assert_eq!(unscoped, vec!["legacy"]);
836 assert!(store.scoped(None).get("hidden").is_none());
837 }
838
839 #[test]
840 fn scoped_view_delete_doesnt_touch_other_tenants() {
841 let store = StateStore::new();
842 store.scoped(Some("acme")).set("shared", json!(1), "act");
843 store.scoped(Some("globex")).set("shared", json!(2), "act");
844
845 store.scoped(Some("acme")).delete("shared", "act");
846 assert!(!store.scoped(Some("acme")).exists("shared"));
847 assert!(store.scoped(Some("globex")).exists("shared"));
848 }
849
850 #[test]
851 fn empty_tenant_string_treated_as_unscoped() {
852 let store = StateStore::new();
856 store.scoped(Some("")).set("k", json!(1), "act");
857 assert_eq!(store.get("k"), Some(json!(1)));
858 assert_eq!(store.scoped(None).get("k"), Some(json!(1)));
859 }
860}