Skip to main content

car_state/
lib.rs

1//! State management for Common Agent Runtime.
2//!
3//! Provides structured, typed state with transition logging.
4//! Every mutation produces a StateTransition record for audit and replay.
5//!
6//! ## Persistence (Parslee-ai/car#181)
7//!
8//! `StateStore::durable(path)` opens a JSONL-backed store. Each
9//! mutation appends a transition line; on construction the file is
10//! replayed to rebuild current state. This is the agent-persistence
11//! pattern documented in `docs/persistence.md`. JSONL was chosen over
12//! sqlite/sled to stay aligned with the existing JSONL persistence
13//! used by `car-eventlog` and `car-memgine` — one file shape, one
14//! reap+compact story, no native build deps.
15//!
16//! Per-key TTL is supported via `set_with_ttl` — the in-memory state
17//! drops the key when `reap_expired(now)` runs after the deadline.
18//! The on-disk file is compacted at the same time so the journal
19//! doesn't grow unbounded.
20
21use chrono::{DateTime, Duration, Utc};
22use parking_lot::Mutex;
23use serde::{Deserialize, Serialize};
24use serde_json::Value;
25use std::collections::HashMap;
26use std::fs::{File, OpenOptions};
27use std::io::{BufRead, BufReader, BufWriter, Write};
28use std::path::{Path, PathBuf};
29
30/// An explicit record of a state change.
31///
32/// `ttl_secs` is optional — when present, the key expires `ttl_secs`
33/// seconds after `timestamp`. Reads return the value while it's
34/// live; `reap_expired` drops it after the deadline. The default
35/// (None) means "keep until explicitly deleted."
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub struct StateTransition {
38    pub key: String,
39    pub old_value: Option<Value>,
40    pub new_value: Option<Value>,
41    pub action_id: String,
42    pub timestamp: DateTime<Utc>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub ttl_secs: Option<u64>,
45    /// The key's monotonic version *after* this transition. Persisted so
46    /// the version counter survives journal compaction and restart — a
47    /// compacted journal collapses a key's history to one line, so without
48    /// this field replay would recount from 1 and break the staleness
49    /// guarantee (neo review M2). Optional for backward-compatible reads of
50    /// pre-versioning journals.
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub version: Option<u64>,
53}
54
55/// Thread-safe state store with transition logging.
56///
57/// All reads and writes go through this store. Every write produces a
58/// StateTransition record for audit and replay. Optionally backed by
59/// a JSONL journal file for durability across process restarts (see
60/// [`StateStore::durable`]).
61pub struct StateStore {
62    state: Mutex<HashMap<String, Value>>,
63    transitions: Mutex<Vec<StateTransition>>,
64    /// Monotonic per-key version counter, bumped on every write/delete.
65    /// The basis for transactional staleness detection (survey §5.2.4):
66    /// an action that read `k` at version `v` can be flagged when `k` has
67    /// since advanced past `v`, catching belief divergence that a value
68    /// comparison alone would miss (e.g. set back to the same value).
69    versions: Mutex<HashMap<String, u64>>,
70    /// Optional JSONL-backed durability layer. When set, every
71    /// `StateTransition` appended to the in-memory log is also
72    /// appended to this file's open writer; `reap_expired` rewrites
73    /// the file to compact away dropped keys.
74    journal: Mutex<Option<Journal>>,
75}
76
77struct Journal {
78    path: PathBuf,
79    writer: BufWriter<File>,
80}
81
82impl StateStore {
83    pub fn new() -> Self {
84        Self {
85            state: Mutex::new(HashMap::new()),
86            transitions: Mutex::new(Vec::new()),
87            versions: Mutex::new(HashMap::new()),
88            journal: Mutex::new(None),
89        }
90    }
91
92    /// Open a durable, JSONL-backed StateStore. If the file exists,
93    /// its transitions are replayed (last-write-wins per key, with
94    /// TTLs honored) to rebuild current state. Subsequent writes
95    /// append to the same file.
96    ///
97    /// Returns an error only on filesystem-level failures (parent
98    /// directory missing, permission denied, etc.). Malformed lines
99    /// inside the journal are skipped with a warning rather than
100    /// failing the open — agent persistence shouldn't refuse to
101    /// start over a single bad line.
102    pub fn durable(path: impl Into<PathBuf>) -> std::io::Result<Self> {
103        let path = path.into();
104        if let Some(parent) = path.parent() {
105            if !parent.as_os_str().is_empty() {
106                std::fs::create_dir_all(parent)?;
107            }
108        }
109        let store = Self::new();
110        store.replay_journal(&path)?;
111        let file = OpenOptions::new().create(true).append(true).open(&path)?;
112        *store.journal.lock() = Some(Journal {
113            path,
114            writer: BufWriter::new(file),
115        });
116        Ok(store)
117    }
118
119    fn replay_journal(&self, path: &Path) -> std::io::Result<()> {
120        if !path.exists() {
121            return Ok(());
122        }
123        let file = File::open(path)?;
124        let reader = BufReader::new(file);
125        let now = Utc::now();
126        let mut state = self.state.lock();
127        let mut transitions = self.transitions.lock();
128        let mut versions = self.versions.lock();
129        for line in reader.lines() {
130            let line = match line {
131                Ok(l) if l.trim().is_empty() => continue,
132                Ok(l) => l,
133                Err(_) => continue,
134            };
135            let Ok(t) = serde_json::from_str::<StateTransition>(&line) else {
136                // Malformed line. Don't refuse to boot over it.
137                tracing::warn!(
138                    journal = %path.display(),
139                    "skipping malformed StateStore journal line"
140                );
141                continue;
142            };
143            // Replay last-write-wins. TTLs that already expired are
144            // dropped at replay time so we don't surface stale data
145            // on first read.
146            if let (Some(ttl), Some(value)) = (t.ttl_secs, &t.new_value) {
147                if now.signed_duration_since(t.timestamp) > Duration::seconds(ttl as i64) {
148                    state.remove(&t.key);
149                } else {
150                    state.insert(t.key.clone(), value.clone());
151                }
152            } else if let Some(value) = &t.new_value {
153                state.insert(t.key.clone(), value.clone());
154            } else {
155                state.remove(&t.key);
156            }
157            // Restore the persisted version when present (survives
158            // compaction); otherwise count transitions for legacy
159            // pre-versioning journals. Take the max so the counter stays
160            // monotonic across a mix of compacted and appended lines.
161            let entry = versions.entry(t.key.clone()).or_insert(0);
162            let restored = t.version.unwrap_or(*entry + 1);
163            *entry = (*entry).max(restored);
164            transitions.push(t);
165        }
166        Ok(())
167    }
168
169    fn append_journal(&self, transition: &StateTransition) {
170        let mut journal = self.journal.lock();
171        let Some(journal) = journal.as_mut() else {
172            return;
173        };
174        // Best-effort: a failed disk write tracing::warn!s but the
175        // in-memory write already succeeded. Callers who need
176        // guaranteed durability should call `sync` after batches.
177        let Ok(json) = serde_json::to_string(transition) else {
178            return;
179        };
180        if let Err(e) = writeln!(journal.writer, "{json}") {
181            tracing::warn!(
182                journal = %journal.path.display(),
183                error = %e,
184                "StateStore journal append failed"
185            );
186            return;
187        }
188        let _ = journal.writer.flush();
189    }
190
191    /// Fsync the journal writer. Call after a batch of writes when
192    /// you need durability guarantees beyond best-effort flush.
193    pub fn sync(&self) -> std::io::Result<()> {
194        let mut journal = self.journal.lock();
195        let Some(journal) = journal.as_mut() else {
196            return Ok(());
197        };
198        journal.writer.flush()?;
199        journal.writer.get_ref().sync_all()
200    }
201
202    /// Drop expired keys (per `ttl_secs` on their last write) and
203    /// rewrite the journal as a compacted snapshot of the surviving
204    /// state. Returns the keys that were reaped.
205    ///
206    /// **TTL semantics**: a `ttl_secs` of 0 means "expired
207    /// immediately" — the key is reapable on the next call. There
208    /// is no "0 = forever" sentinel; use `set` (no TTL) for keys
209    /// that should never auto-expire.
210    ///
211    /// Latest-write-wins: a key rewritten WITHOUT a TTL after a
212    /// TTL'd write is NOT reaped — the more recent write
213    /// effectively cancels the TTL.
214    ///
215    /// Single-pass over the transitions log via a key→latest
216    /// index, so cost is O(n) in journal length (not O(n²)).
217    pub fn reap_expired(&self, now: DateTime<Utc>) -> std::io::Result<Vec<String>> {
218        let mut state = self.state.lock();
219        let mut transitions = self.transitions.lock();
220        // Build a single-pass index of the latest transition per
221        // key. Walking the whole log once is unavoidable; doing it
222        // ONCE keeps reap O(n) in journal length.
223        let mut latest_by_key: HashMap<&str, &StateTransition> = HashMap::new();
224        for t in transitions.iter() {
225            latest_by_key.insert(t.key.as_str(), t);
226        }
227        let expired: Vec<String> = latest_by_key
228            .iter()
229            .filter_map(|(_k, t)| {
230                let ttl = t.ttl_secs?;
231                if t.new_value.is_none() {
232                    return None;
233                }
234                let age = now.signed_duration_since(t.timestamp);
235                (age > Duration::seconds(ttl as i64)).then(|| t.key.clone())
236            })
237            .collect();
238        let mut reaped = Vec::new();
239        for key in expired {
240            if state.remove(&key).is_some() {
241                // Bump the version so an expiry is observable as a change
242                // (neo review N1: keeps in-memory versions in step with
243                // what replay would reconstruct).
244                let version = self.bump_version(&key);
245                reaped.push(key.clone());
246                transitions.push(StateTransition {
247                    key,
248                    old_value: None,
249                    new_value: None,
250                    action_id: "reap".to_string(),
251                    timestamp: now,
252                    ttl_secs: None,
253                    version: Some(version),
254                });
255            }
256        }
257        drop(state);
258        drop(transitions);
259        if !reaped.is_empty() {
260            self.compact_journal()?;
261        }
262        Ok(reaped)
263    }
264
265    /// Rewrite the journal as a flat snapshot of the current state —
266    /// one transition per surviving key, no replay history. Reduces
267    /// journal size without changing observable behavior.
268    ///
269    /// **Concurrency requirement**: callers MUST hold the
270    /// observation that no other thread is mid-`set`/`delete` on
271    /// this store; the snapshot is taken under the state lock, but
272    /// in-flight journal appends to the *old* file handle that
273    /// land between snapshot and rename are lost. Today's only
274    /// caller is `reap_expired`, which holds both locks across the
275    /// call; external callers should serialize themselves.
276    pub(crate) fn compact_journal(&self) -> std::io::Result<()> {
277        let mut journal = self.journal.lock();
278        let Some(j) = journal.as_mut() else {
279            return Ok(());
280        };
281        let state = self.state.lock().clone();
282        let versions = self.versions.lock().clone();
283        let tmp_path = j.path.with_extension("jsonl.tmp");
284        {
285            let tmp_file = File::create(&tmp_path)?;
286            let mut writer = BufWriter::new(tmp_file);
287            for (key, value) in &state {
288                let t = StateTransition {
289                    key: key.clone(),
290                    old_value: None,
291                    new_value: Some(value.clone()),
292                    action_id: "compact".to_string(),
293                    timestamp: Utc::now(),
294                    ttl_secs: None,
295                    // Persist the true version so replay restores it rather
296                    // than recounting the collapsed history from 1.
297                    version: versions.get(key).copied(),
298                };
299                let line = serde_json::to_string(&t)?;
300                writeln!(writer, "{line}")?;
301            }
302            writer.flush()?;
303            writer.get_ref().sync_all()?;
304        }
305        std::fs::rename(&tmp_path, &j.path)?;
306        let file = OpenOptions::new().create(true).append(true).open(&j.path)?;
307        j.writer = BufWriter::new(file);
308        Ok(())
309    }
310
311    pub fn get(&self, key: &str) -> Option<Value> {
312        self.state.lock().get(key).cloned()
313    }
314
315    pub fn get_or(&self, key: &str, default: Value) -> Value {
316        self.state.lock().get(key).cloned().unwrap_or(default)
317    }
318
319    pub fn exists(&self, key: &str) -> bool {
320        self.state.lock().contains_key(key)
321    }
322
323    pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
324        self.set_inner(key, value, action_id, None)
325    }
326
327    /// Set a key with a TTL (seconds from now). `reap_expired`
328    /// drops the key once the deadline passes; re-setting the key
329    /// without a TTL (`set`) cancels the TTL.
330    ///
331    /// `ttl_secs == 0` means "expire immediately" (reapable on the
332    /// next `reap_expired` call). It is NOT a "no expiry" sentinel
333    /// — use the plain `set(...)` method for keys that should
334    /// never auto-expire. This differs from the Unix/Redis
335    /// convention; the distinction matters because a TTL passed
336    /// from untrusted input could otherwise silently mean
337    /// "forever" when the caller intended "never store."
338    pub fn set_with_ttl(
339        &self,
340        key: &str,
341        value: Value,
342        action_id: &str,
343        ttl_secs: u64,
344    ) -> StateTransition {
345        self.set_inner(key, value, action_id, Some(ttl_secs))
346    }
347
348    fn set_inner(
349        &self,
350        key: &str,
351        value: Value,
352        action_id: &str,
353        ttl_secs: Option<u64>,
354    ) -> StateTransition {
355        let mut state = self.state.lock();
356        let old = state.get(key).cloned();
357        state.insert(key.to_string(), value.clone());
358        let version = self.bump_version(key);
359
360        let t = StateTransition {
361            key: key.to_string(),
362            old_value: old,
363            new_value: Some(value),
364            action_id: action_id.to_string(),
365            timestamp: Utc::now(),
366            ttl_secs,
367            version: Some(version),
368        };
369
370        self.transitions.lock().push(t.clone());
371        self.append_journal(&t);
372        t
373    }
374
375    /// Increment the monotonic version counter for `key`, returning the new
376    /// version.
377    fn bump_version(&self, key: &str) -> u64 {
378        let mut versions = self.versions.lock();
379        let v = versions.entry(key.to_string()).or_insert(0);
380        *v += 1;
381        *v
382    }
383
384    /// Current version of `key` (number of writes/deletes applied to it),
385    /// or `None` if it was never written. Used by the transactional
386    /// conflict checker to detect stale reads (survey §5.2.4).
387    pub fn version(&self, key: &str) -> Option<u64> {
388        self.versions.lock().get(key).copied()
389    }
390
391    /// Snapshot of all key versions — the version map an action's
392    /// assumptions are checked against.
393    pub fn versions(&self) -> HashMap<String, u64> {
394        self.versions.lock().clone()
395    }
396
397    /// Atomic snapshot of both the current values and the current versions,
398    /// taken under a single consistent lock acquisition (state then
399    /// versions, matching the write path) so the two maps can't tear — a
400    /// caller never sees a value from version N+1 paired with version N
401    /// (neo review N2). This is the pair the transactional conflict checker
402    /// (`car_verify::check_transaction`) should consume.
403    pub fn versioned_snapshot(&self) -> (HashMap<String, Value>, HashMap<String, u64>) {
404        let state = self.state.lock();
405        let versions = self.versions.lock();
406        (state.clone(), versions.clone())
407    }
408
409    pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
410        let mut state = self.state.lock();
411        let old = state.remove(key)?;
412        let version = self.bump_version(key);
413
414        let t = StateTransition {
415            key: key.to_string(),
416            old_value: Some(old),
417            new_value: None,
418            action_id: action_id.to_string(),
419            timestamp: Utc::now(),
420            ttl_secs: None,
421            version: Some(version),
422        };
423
424        self.transitions.lock().push(t.clone());
425        self.append_journal(&t);
426        Some(t)
427    }
428
429    /// Deep clone of current state.
430    pub fn snapshot(&self) -> HashMap<String, Value> {
431        self.state.lock().clone()
432    }
433
434    /// Restore state from a snapshot, truncating transitions.
435    pub fn restore(&self, snapshot: HashMap<String, Value>, transition_count: usize) {
436        *self.state.lock() = snapshot;
437        self.transitions.lock().truncate(transition_count);
438    }
439
440    pub fn transition_count(&self) -> usize {
441        self.transitions.lock().len()
442    }
443
444    pub fn transitions(&self) -> Vec<StateTransition> {
445        self.transitions.lock().clone()
446    }
447
448    pub fn transitions_since(&self, index: usize) -> Vec<StateTransition> {
449        let transitions = self.transitions.lock();
450        let start = index.min(transitions.len());
451        transitions[start..].to_vec()
452    }
453
454    pub fn keys(&self) -> Vec<String> {
455        self.state.lock().keys().cloned().collect()
456    }
457
458    /// Replace the entire state map without recording transitions.
459    /// Used by checkpoint restore to avoid synthetic transition history.
460    /// Also clears the transitions log so callers of `transitions_since()`
461    /// don't see stale history from the discarded state.
462    pub fn replace_all(&self, snapshot: HashMap<String, Value>) {
463        *self.state.lock() = snapshot;
464        self.transitions.lock().clear();
465    }
466
467    /// Build a tenant-scoped view over this store
468    /// (Parslee-ai/car#187 phase 3 enforcement).
469    ///
470    /// All reads / writes go through `tenant:<tenant_id>:<key>` so
471    /// distinct tenants can't see each other's keys. `tenant = None`
472    /// returns a view that hits the unscoped (legacy) namespace —
473    /// callers that don't yet have a `RuntimeScope` get pre-#187
474    /// behaviour automatically.
475    ///
476    /// Cheap to construct; holds a `&self` borrow plus the tenant
477    /// string. The view's methods take the parking-lot lock the same
478    /// way the unscoped methods do.
479    pub fn scoped<'a>(&'a self, tenant: Option<&'a str>) -> ScopedStateView<'a> {
480        ScopedStateView {
481            store: self,
482            tenant,
483        }
484    }
485}
486
487/// Tenant-scoped view over a [`StateStore`]. All key arguments are
488/// transparently prefixed with `tenant:<tenant_id>:` before hitting
489/// the underlying store; on the way out, the prefix is stripped so
490/// callers see their original keys.
491///
492/// Construct via [`StateStore::scoped`]. When `tenant` is `None`,
493/// the prefix is empty and the view is functionally equivalent to
494/// the unscoped methods on `StateStore` — useful for code paths
495/// that always go through this view regardless of whether scope is
496/// active.
497///
498/// # Isolation guarantee
499///
500/// Two views with distinct `tenant` strings cannot observe each
501/// other's writes through `get` / `exists` / `keys`. The transitions
502/// log still records the full (prefixed) key so audit / replay sees
503/// the actual storage layout.
504///
505/// # What isolation does *not* cover (phase 3 follow-ups)
506///
507/// - `StateStore::snapshot` / `restore` are deliberately unscoped —
508///   they're called at proposal boundaries for rollback and need to
509///   see the whole map. Per-tenant partial rollback is a known
510///   concurrency hole when multiple proposals run interleaved; the
511///   pre-#187 baseline has the same issue, and fixing it cleanly
512///   requires either serializing per-tenant or extending the
513///   transactional model. Tracked as a follow-up.
514/// - The journal file (when durability is on) records full
515///   prefixed keys. Operators rotating tenants out can grep the
516///   journal by prefix.
517pub struct ScopedStateView<'a> {
518    store: &'a StateStore,
519    tenant: Option<&'a str>,
520}
521
522impl<'a> ScopedStateView<'a> {
523    fn full_key(&self, key: &str) -> String {
524        match self.tenant {
525            Some(t) if !t.is_empty() => format!("tenant:{t}:{key}"),
526            _ => key.to_string(),
527        }
528    }
529
530    fn strip_prefix<'k>(&self, full: &'k str) -> Option<&'k str> {
531        match self.tenant {
532            Some(t) if !t.is_empty() => {
533                let prefix = format!("tenant:{t}:");
534                full.strip_prefix(&prefix)
535            }
536            _ => Some(full),
537        }
538    }
539
540    pub fn get(&self, key: &str) -> Option<Value> {
541        self.store.get(&self.full_key(key))
542    }
543
544    pub fn get_or(&self, key: &str, default: Value) -> Value {
545        self.store.get_or(&self.full_key(key), default)
546    }
547
548    pub fn exists(&self, key: &str) -> bool {
549        self.store.exists(&self.full_key(key))
550    }
551
552    pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
553        self.store.set(&self.full_key(key), value, action_id)
554    }
555
556    pub fn set_with_ttl(
557        &self,
558        key: &str,
559        value: Value,
560        action_id: &str,
561        ttl_secs: u64,
562    ) -> StateTransition {
563        self.store
564            .set_with_ttl(&self.full_key(key), value, action_id, ttl_secs)
565    }
566
567    pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
568        self.store.delete(&self.full_key(key), action_id)
569    }
570
571    /// Return keys belonging to this tenant only, with the
572    /// `tenant:<id>:` prefix stripped so callers see their original
573    /// key names. Unscoped views (no tenant) return only keys that
574    /// don't start with `tenant:` — preventing accidental visibility
575    /// of scoped state through a legacy code path.
576    pub fn keys(&self) -> Vec<String> {
577        self.store
578            .keys()
579            .into_iter()
580            .filter_map(|k| {
581                if self.tenant.map(|t| !t.is_empty()).unwrap_or(false) {
582                    self.strip_prefix(&k).map(str::to_string)
583                } else if k.starts_with("tenant:") {
584                    None
585                } else {
586                    Some(k)
587                }
588            })
589            .collect()
590    }
591}
592
593impl Default for StateStore {
594    fn default() -> Self {
595        Self::new()
596    }
597}
598
599impl car_ir::precondition::StateView for StateStore {
600    fn get_value(&self, key: &str) -> Option<Value> {
601        self.get(key)
602    }
603    fn key_exists(&self, key: &str) -> bool {
604        self.exists(key)
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use serde_json::json;
612
613    #[test]
614    fn set_and_get() {
615        let store = StateStore::new();
616        store.set("x", Value::from(42), "test");
617        assert_eq!(store.get("x"), Some(Value::from(42)));
618    }
619
620    #[test]
621    fn exists() {
622        let store = StateStore::new();
623        assert!(!store.exists("x"));
624        store.set("x", Value::from(1), "test");
625        assert!(store.exists("x"));
626    }
627
628    #[test]
629    fn delete() {
630        let store = StateStore::new();
631        store.set("x", Value::from(1), "test");
632        let t = store.delete("x", "test");
633        assert!(t.is_some());
634        assert!(!store.exists("x"));
635    }
636
637    #[test]
638    fn delete_nonexistent() {
639        let store = StateStore::new();
640        assert!(store.delete("x", "test").is_none());
641    }
642
643    #[test]
644    fn snapshot_and_restore() {
645        let store = StateStore::new();
646        store.set("x", Value::from(1), "a");
647        let snap = store.snapshot();
648        let tc = store.transition_count();
649
650        store.set("y", Value::from(2), "b");
651        assert!(store.exists("y"));
652
653        store.restore(snap, tc);
654        assert!(store.exists("x"));
655        assert!(!store.exists("y"));
656        assert_eq!(store.transition_count(), 1);
657    }
658
659    #[test]
660    fn transitions_logged() {
661        let store = StateStore::new();
662        store.set("a", Value::from(1), "act1");
663        store.set("b", Value::from(2), "act2");
664
665        let transitions = store.transitions();
666        assert_eq!(transitions.len(), 2);
667        assert_eq!(transitions[0].key, "a");
668        assert_eq!(transitions[1].key, "b");
669    }
670
671    #[test]
672    fn transitions_since() {
673        let store = StateStore::new();
674        store.set("a", Value::from(1), "act1");
675        let idx = store.transition_count();
676        store.set("b", Value::from(2), "act2");
677
678        let since = store.transitions_since(idx);
679        assert_eq!(since.len(), 1);
680        assert_eq!(since[0].key, "b");
681    }
682
683    #[test]
684    fn transition_records_old_value() {
685        let store = StateStore::new();
686        store.set("x", Value::from(1), "first");
687        store.set("x", Value::from(2), "second");
688
689        let transitions = store.transitions();
690        assert_eq!(transitions[1].old_value, Some(Value::from(1)));
691        assert_eq!(transitions[1].new_value, Some(Value::from(2)));
692    }
693
694    #[test]
695    fn keys() {
696        let store = StateStore::new();
697        store.set("a", Value::from(1), "t");
698        store.set("b", Value::from(2), "t");
699        let mut keys = store.keys();
700        keys.sort();
701        assert_eq!(keys, vec!["a", "b"]);
702    }
703
704    #[test]
705    fn transitions_since_after_restore_does_not_panic() {
706        let store = StateStore::new();
707        store.set("a", serde_json::json!(1), "test");
708        store.set("b", serde_json::json!(2), "test");
709        let count_before = store.transition_count(); // 2
710
711        // Restore to empty, truncating transitions to 0
712        store.restore(HashMap::new(), 0);
713
714        // Using the stale count_before (2) should not panic
715        let result = store.transitions_since(count_before);
716        assert!(result.is_empty());
717    }
718
719    #[test]
720    fn transitions_since_normal_usage() {
721        let store = StateStore::new();
722        store.set("a", serde_json::json!(1), "test");
723        let mark = store.transition_count();
724        store.set("b", serde_json::json!(2), "test");
725        let since = store.transitions_since(mark);
726        assert_eq!(since.len(), 1);
727        assert_eq!(since[0].key, "b");
728    }
729
730    #[test]
731    fn replace_all_swaps_state_without_transitions() {
732        let store = StateStore::new();
733        store.set("old_key", serde_json::json!("old"), "setup");
734
735        let mut new_state = HashMap::new();
736        new_state.insert("new_key".to_string(), serde_json::json!("new"));
737        store.replace_all(new_state);
738
739        assert_eq!(store.get("new_key"), Some(serde_json::json!("new")));
740        assert_eq!(store.get("old_key"), None);
741        // After replace_all, transitions should be cleared (not preserved)
742        assert_eq!(store.transition_count(), 0);
743    }
744
745    #[test]
746    fn durable_store_survives_reopen() {
747        let dir = tempfile::tempdir().unwrap();
748        let path = dir.path().join("state.jsonl");
749        {
750            let store = StateStore::durable(&path).unwrap();
751            store.set("agent", serde_json::json!("planner"), "boot");
752            store.set("turns", serde_json::json!(42), "tick");
753            store.sync().unwrap();
754        }
755        let store = StateStore::durable(&path).unwrap();
756        assert_eq!(store.get("agent"), Some(serde_json::json!("planner")));
757        assert_eq!(store.get("turns"), Some(serde_json::json!(42)));
758    }
759
760    #[test]
761    fn durable_store_replays_deletes() {
762        let dir = tempfile::tempdir().unwrap();
763        let path = dir.path().join("state.jsonl");
764        {
765            let store = StateStore::durable(&path).unwrap();
766            store.set("transient", serde_json::json!("x"), "boot");
767            store.delete("transient", "rm");
768            store.sync().unwrap();
769        }
770        let store = StateStore::durable(&path).unwrap();
771        assert!(!store.exists("transient"));
772    }
773
774    #[test]
775    fn ttl_reap_drops_expired_and_keeps_fresh() {
776        let store = StateStore::new();
777        store.set_with_ttl("short", serde_json::json!(1), "set", 0);
778        store.set_with_ttl("long", serde_json::json!(2), "set", 3600);
779        store.set("forever", serde_json::json!(3), "set");
780        // Now + 10s — short (ttl=0) is expired, long (ttl=3600) is fresh, forever has no TTL.
781        let reaped = store
782            .reap_expired(Utc::now() + Duration::seconds(10))
783            .unwrap();
784        assert_eq!(reaped, vec!["short".to_string()]);
785        assert!(!store.exists("short"));
786        assert_eq!(store.get("long"), Some(serde_json::json!(2)));
787        assert_eq!(store.get("forever"), Some(serde_json::json!(3)));
788    }
789
790    #[test]
791    fn durable_ttl_compacts_journal() {
792        let dir = tempfile::tempdir().unwrap();
793        let path = dir.path().join("state.jsonl");
794        {
795            let store = StateStore::durable(&path).unwrap();
796            for i in 0..50 {
797                store.set_with_ttl(&format!("k{i}"), serde_json::json!(i), "set", 0);
798            }
799            store.set("survivor", serde_json::json!("kept"), "set");
800            store.sync().unwrap();
801            let pre = std::fs::metadata(&path).unwrap().len();
802            // Force expiry by advancing the clock past the 0s TTL.
803            let reaped = store
804                .reap_expired(Utc::now() + Duration::seconds(1))
805                .unwrap();
806            assert_eq!(reaped.len(), 50);
807            store.sync().unwrap();
808            let post = std::fs::metadata(&path).unwrap().len();
809            // Compaction should shrink the journal: 50 TTL'd writes + 1
810            // survivor pre-compact is 51 lines; post-compact is 1 line.
811            assert!(
812                post < pre,
813                "post={post} pre={pre} — compaction did not shrink"
814            );
815        }
816        // Reopen — only the survivor remains.
817        let store = StateStore::durable(&path).unwrap();
818        assert!(!store.exists("k0"));
819        assert!(!store.exists("k49"));
820        assert_eq!(store.get("survivor"), Some(serde_json::json!("kept")));
821        // Version survives compaction (neo M2): survivor was written once,
822        // so its version is 1 after a compaction-then-reopen, not reset in
823        // a way that breaks staleness detection.
824        assert_eq!(store.version("survivor"), Some(1));
825    }
826
827    #[test]
828    fn version_is_monotonic_and_survives_compaction() {
829        let dir = tempfile::tempdir().unwrap();
830        let path = dir.path().join("v.jsonl");
831        {
832            let store = StateStore::durable(&path).unwrap();
833            for i in 0..3 {
834                store.set("cfg", serde_json::json!(i), "set");
835            }
836            assert_eq!(store.version("cfg"), Some(3));
837            // A TTL key that expires forces a compaction of the journal.
838            store.set_with_ttl("tmp", serde_json::json!(1), "set", 0);
839            store.sync().unwrap();
840            store.reap_expired(Utc::now() + Duration::seconds(1)).unwrap();
841            store.sync().unwrap();
842        }
843        // After compaction + restart, cfg's version must still be 3 — not
844        // recounted to 1 from the collapsed single line.
845        let store = StateStore::durable(&path).unwrap();
846        assert_eq!(store.version("cfg"), Some(3));
847    }
848
849    #[test]
850    fn reap_bumps_version() {
851        let store = StateStore::new();
852        store.set("k", serde_json::json!("v"), "set");
853        assert_eq!(store.version("k"), Some(1));
854        store.set_with_ttl("k", serde_json::json!("v2"), "set", 0);
855        assert_eq!(store.version("k"), Some(2));
856        store.reap_expired(Utc::now() + Duration::seconds(1)).unwrap();
857        // Expiry is an observable change → version advances (neo N1).
858        assert_eq!(store.version("k"), Some(3));
859    }
860
861    #[test]
862    fn ttl_then_rewrite_without_ttl_does_not_reap() {
863        let store = StateStore::new();
864        store.set_with_ttl("k", serde_json::json!("a"), "first", 0);
865        store.set("k", serde_json::json!("b"), "second"); // no TTL
866        let reaped = store
867            .reap_expired(Utc::now() + Duration::seconds(10))
868            .unwrap();
869        assert!(reaped.is_empty());
870        assert_eq!(store.get("k"), Some(serde_json::json!("b")));
871    }
872
873    #[test]
874    fn malformed_journal_line_is_skipped_not_fatal() {
875        let dir = tempfile::tempdir().unwrap();
876        let path = dir.path().join("state.jsonl");
877        // Plant a good line + a bad line + another good line.
878        {
879            std::fs::write(
880                &path,
881                "{\"key\":\"a\",\"old_value\":null,\"new_value\":1,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n\
882                 not-json\n\
883                 {\"key\":\"b\",\"old_value\":null,\"new_value\":2,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n",
884            )
885            .unwrap();
886        }
887        let store = StateStore::durable(&path).unwrap();
888        assert_eq!(store.get("a"), Some(serde_json::json!(1)));
889        assert_eq!(store.get("b"), Some(serde_json::json!(2)));
890    }
891
892    // ScopedStateView tests — Parslee-ai/car#187 phase 3 enforcement.
893
894    #[test]
895    fn scoped_view_writes_isolate_between_tenants() {
896        let store = StateStore::new();
897        store.scoped(Some("acme")).set("config", json!("A"), "act");
898        store
899            .scoped(Some("globex"))
900            .set("config", json!("G"), "act");
901
902        // Each tenant sees their own value.
903        assert_eq!(store.scoped(Some("acme")).get("config"), Some(json!("A")));
904        assert_eq!(store.scoped(Some("globex")).get("config"), Some(json!("G")));
905    }
906
907    #[test]
908    fn scoped_view_isolates_existence_check() {
909        let store = StateStore::new();
910        store.scoped(Some("acme")).set("k", json!(1), "act");
911        assert!(store.scoped(Some("acme")).exists("k"));
912        assert!(!store.scoped(Some("globex")).exists("k"));
913    }
914
915    #[test]
916    fn scoped_view_keys_filters_to_tenant() {
917        let store = StateStore::new();
918        store.scoped(Some("acme")).set("a", json!(1), "act");
919        store.scoped(Some("acme")).set("b", json!(2), "act");
920        store.scoped(Some("globex")).set("g", json!(9), "act");
921        store.set("unscoped", json!(0), "act");
922
923        let mut acme_keys = store.scoped(Some("acme")).keys();
924        acme_keys.sort();
925        assert_eq!(acme_keys, vec!["a", "b"]);
926
927        let globex_keys = store.scoped(Some("globex")).keys();
928        assert_eq!(globex_keys, vec!["g"]);
929    }
930
931    #[test]
932    fn unscoped_view_skips_tenant_prefixed_keys() {
933        // Calling scoped(None) — the legacy-compat path — must NOT
934        // accidentally expose other tenants' keys via `keys()`. This
935        // is the inverse of the isolation contract: the unscoped
936        // namespace shouldn't see scoped data even though it's all
937        // in the same backing HashMap.
938        let store = StateStore::new();
939        store.set("legacy", json!("ok"), "act");
940        store.scoped(Some("acme")).set("hidden", json!(42), "act");
941
942        let unscoped = store.scoped(None).keys();
943        assert_eq!(unscoped, vec!["legacy"]);
944        assert!(store.scoped(None).get("hidden").is_none());
945    }
946
947    #[test]
948    fn scoped_view_delete_doesnt_touch_other_tenants() {
949        let store = StateStore::new();
950        store.scoped(Some("acme")).set("shared", json!(1), "act");
951        store.scoped(Some("globex")).set("shared", json!(2), "act");
952
953        store.scoped(Some("acme")).delete("shared", "act");
954        assert!(!store.scoped(Some("acme")).exists("shared"));
955        assert!(store.scoped(Some("globex")).exists("shared"));
956    }
957
958    #[test]
959    fn empty_tenant_string_treated_as_unscoped() {
960        // Some(""): defensive — RuntimeScope normalizes empty strings
961        // to None at the dispatcher, but the view shouldn't trip if
962        // a caller passes an empty tenant by mistake.
963        let store = StateStore::new();
964        store.scoped(Some("")).set("k", json!(1), "act");
965        assert_eq!(store.get("k"), Some(json!(1)));
966        assert_eq!(store.scoped(None).get("k"), Some(json!(1)));
967    }
968}