use chrono::{DateTime, Duration, Utc};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StateTransition {
pub key: String,
pub old_value: Option<Value>,
pub new_value: Option<Value>,
pub action_id: String,
pub timestamp: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ttl_secs: Option<u64>,
}
pub struct StateStore {
state: Mutex<HashMap<String, Value>>,
transitions: Mutex<Vec<StateTransition>>,
journal: Mutex<Option<Journal>>,
}
struct Journal {
path: PathBuf,
writer: BufWriter<File>,
}
impl StateStore {
pub fn new() -> Self {
Self {
state: Mutex::new(HashMap::new()),
transitions: Mutex::new(Vec::new()),
journal: Mutex::new(None),
}
}
pub fn durable(path: impl Into<PathBuf>) -> std::io::Result<Self> {
let path = path.into();
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
let store = Self::new();
store.replay_journal(&path)?;
let file = OpenOptions::new().create(true).append(true).open(&path)?;
*store.journal.lock() = Some(Journal {
path,
writer: BufWriter::new(file),
});
Ok(store)
}
fn replay_journal(&self, path: &Path) -> std::io::Result<()> {
if !path.exists() {
return Ok(());
}
let file = File::open(path)?;
let reader = BufReader::new(file);
let now = Utc::now();
let mut state = self.state.lock();
let mut transitions = self.transitions.lock();
for line in reader.lines() {
let line = match line {
Ok(l) if l.trim().is_empty() => continue,
Ok(l) => l,
Err(_) => continue,
};
let Ok(t) = serde_json::from_str::<StateTransition>(&line) else {
tracing::warn!(
journal = %path.display(),
"skipping malformed StateStore journal line"
);
continue;
};
if let (Some(ttl), Some(value)) = (t.ttl_secs, &t.new_value) {
if now.signed_duration_since(t.timestamp) > Duration::seconds(ttl as i64) {
state.remove(&t.key);
} else {
state.insert(t.key.clone(), value.clone());
}
} else if let Some(value) = &t.new_value {
state.insert(t.key.clone(), value.clone());
} else {
state.remove(&t.key);
}
transitions.push(t);
}
Ok(())
}
fn append_journal(&self, transition: &StateTransition) {
let mut journal = self.journal.lock();
let Some(journal) = journal.as_mut() else {
return;
};
let Ok(json) = serde_json::to_string(transition) else {
return;
};
if let Err(e) = writeln!(journal.writer, "{json}") {
tracing::warn!(
journal = %journal.path.display(),
error = %e,
"StateStore journal append failed"
);
return;
}
let _ = journal.writer.flush();
}
pub fn sync(&self) -> std::io::Result<()> {
let mut journal = self.journal.lock();
let Some(journal) = journal.as_mut() else {
return Ok(());
};
journal.writer.flush()?;
journal.writer.get_ref().sync_all()
}
pub fn reap_expired(&self, now: DateTime<Utc>) -> std::io::Result<Vec<String>> {
let mut state = self.state.lock();
let mut transitions = self.transitions.lock();
let mut latest_by_key: HashMap<&str, &StateTransition> = HashMap::new();
for t in transitions.iter() {
latest_by_key.insert(t.key.as_str(), t);
}
let expired: Vec<String> = latest_by_key
.iter()
.filter_map(|(_k, t)| {
let ttl = t.ttl_secs?;
if t.new_value.is_none() {
return None;
}
let age = now.signed_duration_since(t.timestamp);
(age > Duration::seconds(ttl as i64)).then(|| t.key.clone())
})
.collect();
let mut reaped = Vec::new();
for key in expired {
if state.remove(&key).is_some() {
reaped.push(key.clone());
transitions.push(StateTransition {
key,
old_value: None,
new_value: None,
action_id: "reap".to_string(),
timestamp: now,
ttl_secs: None,
});
}
}
drop(state);
drop(transitions);
if !reaped.is_empty() {
self.compact_journal()?;
}
Ok(reaped)
}
pub(crate) fn compact_journal(&self) -> std::io::Result<()> {
let mut journal = self.journal.lock();
let Some(j) = journal.as_mut() else {
return Ok(());
};
let state = self.state.lock().clone();
let tmp_path = j.path.with_extension("jsonl.tmp");
{
let tmp_file = File::create(&tmp_path)?;
let mut writer = BufWriter::new(tmp_file);
for (key, value) in &state {
let t = StateTransition {
key: key.clone(),
old_value: None,
new_value: Some(value.clone()),
action_id: "compact".to_string(),
timestamp: Utc::now(),
ttl_secs: None,
};
let line = serde_json::to_string(&t)?;
writeln!(writer, "{line}")?;
}
writer.flush()?;
writer.get_ref().sync_all()?;
}
std::fs::rename(&tmp_path, &j.path)?;
let file = OpenOptions::new().create(true).append(true).open(&j.path)?;
j.writer = BufWriter::new(file);
Ok(())
}
pub fn get(&self, key: &str) -> Option<Value> {
self.state.lock().get(key).cloned()
}
pub fn get_or(&self, key: &str, default: Value) -> Value {
self.state.lock().get(key).cloned().unwrap_or(default)
}
pub fn exists(&self, key: &str) -> bool {
self.state.lock().contains_key(key)
}
pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
self.set_inner(key, value, action_id, None)
}
pub fn set_with_ttl(
&self,
key: &str,
value: Value,
action_id: &str,
ttl_secs: u64,
) -> StateTransition {
self.set_inner(key, value, action_id, Some(ttl_secs))
}
fn set_inner(
&self,
key: &str,
value: Value,
action_id: &str,
ttl_secs: Option<u64>,
) -> StateTransition {
let mut state = self.state.lock();
let old = state.get(key).cloned();
state.insert(key.to_string(), value.clone());
let t = StateTransition {
key: key.to_string(),
old_value: old,
new_value: Some(value),
action_id: action_id.to_string(),
timestamp: Utc::now(),
ttl_secs,
};
self.transitions.lock().push(t.clone());
self.append_journal(&t);
t
}
pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
let mut state = self.state.lock();
let old = state.remove(key)?;
let t = StateTransition {
key: key.to_string(),
old_value: Some(old),
new_value: None,
action_id: action_id.to_string(),
timestamp: Utc::now(),
ttl_secs: None,
};
self.transitions.lock().push(t.clone());
self.append_journal(&t);
Some(t)
}
pub fn snapshot(&self) -> HashMap<String, Value> {
self.state.lock().clone()
}
pub fn restore(&self, snapshot: HashMap<String, Value>, transition_count: usize) {
*self.state.lock() = snapshot;
self.transitions.lock().truncate(transition_count);
}
pub fn transition_count(&self) -> usize {
self.transitions.lock().len()
}
pub fn transitions(&self) -> Vec<StateTransition> {
self.transitions.lock().clone()
}
pub fn transitions_since(&self, index: usize) -> Vec<StateTransition> {
let transitions = self.transitions.lock();
let start = index.min(transitions.len());
transitions[start..].to_vec()
}
pub fn keys(&self) -> Vec<String> {
self.state.lock().keys().cloned().collect()
}
pub fn replace_all(&self, snapshot: HashMap<String, Value>) {
*self.state.lock() = snapshot;
self.transitions.lock().clear();
}
pub fn scoped<'a>(&'a self, tenant: Option<&'a str>) -> ScopedStateView<'a> {
ScopedStateView {
store: self,
tenant,
}
}
}
pub struct ScopedStateView<'a> {
store: &'a StateStore,
tenant: Option<&'a str>,
}
impl<'a> ScopedStateView<'a> {
fn full_key(&self, key: &str) -> String {
match self.tenant {
Some(t) if !t.is_empty() => format!("tenant:{t}:{key}"),
_ => key.to_string(),
}
}
fn strip_prefix<'k>(&self, full: &'k str) -> Option<&'k str> {
match self.tenant {
Some(t) if !t.is_empty() => {
let prefix = format!("tenant:{t}:");
full.strip_prefix(&prefix)
}
_ => Some(full),
}
}
pub fn get(&self, key: &str) -> Option<Value> {
self.store.get(&self.full_key(key))
}
pub fn get_or(&self, key: &str, default: Value) -> Value {
self.store.get_or(&self.full_key(key), default)
}
pub fn exists(&self, key: &str) -> bool {
self.store.exists(&self.full_key(key))
}
pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
self.store.set(&self.full_key(key), value, action_id)
}
pub fn set_with_ttl(
&self,
key: &str,
value: Value,
action_id: &str,
ttl_secs: u64,
) -> StateTransition {
self.store
.set_with_ttl(&self.full_key(key), value, action_id, ttl_secs)
}
pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
self.store.delete(&self.full_key(key), action_id)
}
pub fn keys(&self) -> Vec<String> {
self.store
.keys()
.into_iter()
.filter_map(|k| {
if self.tenant.map(|t| !t.is_empty()).unwrap_or(false) {
self.strip_prefix(&k).map(str::to_string)
} else if k.starts_with("tenant:") {
None
} else {
Some(k)
}
})
.collect()
}
}
impl Default for StateStore {
fn default() -> Self {
Self::new()
}
}
impl car_ir::precondition::StateView for StateStore {
fn get_value(&self, key: &str) -> Option<Value> {
self.get(key)
}
fn key_exists(&self, key: &str) -> bool {
self.exists(key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn set_and_get() {
let store = StateStore::new();
store.set("x", Value::from(42), "test");
assert_eq!(store.get("x"), Some(Value::from(42)));
}
#[test]
fn exists() {
let store = StateStore::new();
assert!(!store.exists("x"));
store.set("x", Value::from(1), "test");
assert!(store.exists("x"));
}
#[test]
fn delete() {
let store = StateStore::new();
store.set("x", Value::from(1), "test");
let t = store.delete("x", "test");
assert!(t.is_some());
assert!(!store.exists("x"));
}
#[test]
fn delete_nonexistent() {
let store = StateStore::new();
assert!(store.delete("x", "test").is_none());
}
#[test]
fn snapshot_and_restore() {
let store = StateStore::new();
store.set("x", Value::from(1), "a");
let snap = store.snapshot();
let tc = store.transition_count();
store.set("y", Value::from(2), "b");
assert!(store.exists("y"));
store.restore(snap, tc);
assert!(store.exists("x"));
assert!(!store.exists("y"));
assert_eq!(store.transition_count(), 1);
}
#[test]
fn transitions_logged() {
let store = StateStore::new();
store.set("a", Value::from(1), "act1");
store.set("b", Value::from(2), "act2");
let transitions = store.transitions();
assert_eq!(transitions.len(), 2);
assert_eq!(transitions[0].key, "a");
assert_eq!(transitions[1].key, "b");
}
#[test]
fn transitions_since() {
let store = StateStore::new();
store.set("a", Value::from(1), "act1");
let idx = store.transition_count();
store.set("b", Value::from(2), "act2");
let since = store.transitions_since(idx);
assert_eq!(since.len(), 1);
assert_eq!(since[0].key, "b");
}
#[test]
fn transition_records_old_value() {
let store = StateStore::new();
store.set("x", Value::from(1), "first");
store.set("x", Value::from(2), "second");
let transitions = store.transitions();
assert_eq!(transitions[1].old_value, Some(Value::from(1)));
assert_eq!(transitions[1].new_value, Some(Value::from(2)));
}
#[test]
fn keys() {
let store = StateStore::new();
store.set("a", Value::from(1), "t");
store.set("b", Value::from(2), "t");
let mut keys = store.keys();
keys.sort();
assert_eq!(keys, vec!["a", "b"]);
}
#[test]
fn transitions_since_after_restore_does_not_panic() {
let store = StateStore::new();
store.set("a", serde_json::json!(1), "test");
store.set("b", serde_json::json!(2), "test");
let count_before = store.transition_count();
store.restore(HashMap::new(), 0);
let result = store.transitions_since(count_before);
assert!(result.is_empty());
}
#[test]
fn transitions_since_normal_usage() {
let store = StateStore::new();
store.set("a", serde_json::json!(1), "test");
let mark = store.transition_count();
store.set("b", serde_json::json!(2), "test");
let since = store.transitions_since(mark);
assert_eq!(since.len(), 1);
assert_eq!(since[0].key, "b");
}
#[test]
fn replace_all_swaps_state_without_transitions() {
let store = StateStore::new();
store.set("old_key", serde_json::json!("old"), "setup");
let mut new_state = HashMap::new();
new_state.insert("new_key".to_string(), serde_json::json!("new"));
store.replace_all(new_state);
assert_eq!(store.get("new_key"), Some(serde_json::json!("new")));
assert_eq!(store.get("old_key"), None);
assert_eq!(store.transition_count(), 0);
}
#[test]
fn durable_store_survives_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("state.jsonl");
{
let store = StateStore::durable(&path).unwrap();
store.set("agent", serde_json::json!("planner"), "boot");
store.set("turns", serde_json::json!(42), "tick");
store.sync().unwrap();
}
let store = StateStore::durable(&path).unwrap();
assert_eq!(store.get("agent"), Some(serde_json::json!("planner")));
assert_eq!(store.get("turns"), Some(serde_json::json!(42)));
}
#[test]
fn durable_store_replays_deletes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("state.jsonl");
{
let store = StateStore::durable(&path).unwrap();
store.set("transient", serde_json::json!("x"), "boot");
store.delete("transient", "rm");
store.sync().unwrap();
}
let store = StateStore::durable(&path).unwrap();
assert!(!store.exists("transient"));
}
#[test]
fn ttl_reap_drops_expired_and_keeps_fresh() {
let store = StateStore::new();
store.set_with_ttl("short", serde_json::json!(1), "set", 0);
store.set_with_ttl("long", serde_json::json!(2), "set", 3600);
store.set("forever", serde_json::json!(3), "set");
let reaped = store
.reap_expired(Utc::now() + Duration::seconds(10))
.unwrap();
assert_eq!(reaped, vec!["short".to_string()]);
assert!(!store.exists("short"));
assert_eq!(store.get("long"), Some(serde_json::json!(2)));
assert_eq!(store.get("forever"), Some(serde_json::json!(3)));
}
#[test]
fn durable_ttl_compacts_journal() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("state.jsonl");
{
let store = StateStore::durable(&path).unwrap();
for i in 0..50 {
store.set_with_ttl(&format!("k{i}"), serde_json::json!(i), "set", 0);
}
store.set("survivor", serde_json::json!("kept"), "set");
store.sync().unwrap();
let pre = std::fs::metadata(&path).unwrap().len();
let reaped = store
.reap_expired(Utc::now() + Duration::seconds(1))
.unwrap();
assert_eq!(reaped.len(), 50);
store.sync().unwrap();
let post = std::fs::metadata(&path).unwrap().len();
assert!(
post < pre,
"post={post} pre={pre} — compaction did not shrink"
);
}
let store = StateStore::durable(&path).unwrap();
assert!(!store.exists("k0"));
assert!(!store.exists("k49"));
assert_eq!(store.get("survivor"), Some(serde_json::json!("kept")));
}
#[test]
fn ttl_then_rewrite_without_ttl_does_not_reap() {
let store = StateStore::new();
store.set_with_ttl("k", serde_json::json!("a"), "first", 0);
store.set("k", serde_json::json!("b"), "second"); let reaped = store
.reap_expired(Utc::now() + Duration::seconds(10))
.unwrap();
assert!(reaped.is_empty());
assert_eq!(store.get("k"), Some(serde_json::json!("b")));
}
#[test]
fn malformed_journal_line_is_skipped_not_fatal() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("state.jsonl");
{
std::fs::write(
&path,
"{\"key\":\"a\",\"old_value\":null,\"new_value\":1,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n\
not-json\n\
{\"key\":\"b\",\"old_value\":null,\"new_value\":2,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n",
)
.unwrap();
}
let store = StateStore::durable(&path).unwrap();
assert_eq!(store.get("a"), Some(serde_json::json!(1)));
assert_eq!(store.get("b"), Some(serde_json::json!(2)));
}
#[test]
fn scoped_view_writes_isolate_between_tenants() {
let store = StateStore::new();
store.scoped(Some("acme")).set("config", json!("A"), "act");
store
.scoped(Some("globex"))
.set("config", json!("G"), "act");
assert_eq!(store.scoped(Some("acme")).get("config"), Some(json!("A")));
assert_eq!(store.scoped(Some("globex")).get("config"), Some(json!("G")));
}
#[test]
fn scoped_view_isolates_existence_check() {
let store = StateStore::new();
store.scoped(Some("acme")).set("k", json!(1), "act");
assert!(store.scoped(Some("acme")).exists("k"));
assert!(!store.scoped(Some("globex")).exists("k"));
}
#[test]
fn scoped_view_keys_filters_to_tenant() {
let store = StateStore::new();
store.scoped(Some("acme")).set("a", json!(1), "act");
store.scoped(Some("acme")).set("b", json!(2), "act");
store.scoped(Some("globex")).set("g", json!(9), "act");
store.set("unscoped", json!(0), "act");
let mut acme_keys = store.scoped(Some("acme")).keys();
acme_keys.sort();
assert_eq!(acme_keys, vec!["a", "b"]);
let globex_keys = store.scoped(Some("globex")).keys();
assert_eq!(globex_keys, vec!["g"]);
}
#[test]
fn unscoped_view_skips_tenant_prefixed_keys() {
let store = StateStore::new();
store.set("legacy", json!("ok"), "act");
store.scoped(Some("acme")).set("hidden", json!(42), "act");
let unscoped = store.scoped(None).keys();
assert_eq!(unscoped, vec!["legacy"]);
assert!(store.scoped(None).get("hidden").is_none());
}
#[test]
fn scoped_view_delete_doesnt_touch_other_tenants() {
let store = StateStore::new();
store.scoped(Some("acme")).set("shared", json!(1), "act");
store.scoped(Some("globex")).set("shared", json!(2), "act");
store.scoped(Some("acme")).delete("shared", "act");
assert!(!store.scoped(Some("acme")).exists("shared"));
assert!(store.scoped(Some("globex")).exists("shared"));
}
#[test]
fn empty_tenant_string_treated_as_unscoped() {
let store = StateStore::new();
store.scoped(Some("")).set("k", json!(1), "act");
assert_eq!(store.get("k"), Some(json!(1)));
assert_eq!(store.scoped(None).get("k"), Some(json!(1)));
}
}