Skip to main content

car_state/
lib.rs

1//! State management for Common Agent Runtime.
2//!
3//! Provides structured, typed state with transition logging.
4//! Every mutation produces a StateTransition record for audit and replay.
5//!
6//! ## Persistence (Parslee-ai/car#181)
7//!
8//! `StateStore::durable(path)` opens a JSONL-backed store. Each
9//! mutation appends a transition line; on construction the file is
10//! replayed to rebuild current state. This is the agent-persistence
11//! pattern documented in `docs/persistence.md`. JSONL was chosen over
12//! sqlite/sled to stay aligned with the existing JSONL persistence
13//! used by `car-eventlog` and `car-memgine` — one file shape, one
14//! reap+compact story, no native build deps.
15//!
16//! Per-key TTL is supported via `set_with_ttl` — the in-memory state
17//! drops the key when `reap_expired(now)` runs after the deadline.
18//! The on-disk file is compacted at the same time so the journal
19//! doesn't grow unbounded.
20
21use chrono::{DateTime, Duration, Utc};
22use parking_lot::Mutex;
23use serde::{Deserialize, Serialize};
24use serde_json::Value;
25use std::collections::HashMap;
26use std::fs::{File, OpenOptions};
27use std::io::{BufRead, BufReader, BufWriter, Write};
28use std::path::{Path, PathBuf};
29
30/// An explicit record of a state change.
31///
32/// `ttl_secs` is optional — when present, the key expires `ttl_secs`
33/// seconds after `timestamp`. Reads return the value while it's
34/// live; `reap_expired` drops it after the deadline. The default
35/// (None) means "keep until explicitly deleted."
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub struct StateTransition {
38    pub key: String,
39    pub old_value: Option<Value>,
40    pub new_value: Option<Value>,
41    pub action_id: String,
42    pub timestamp: DateTime<Utc>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub ttl_secs: Option<u64>,
45}
46
47/// Thread-safe state store with transition logging.
48///
49/// All reads and writes go through this store. Every write produces a
50/// StateTransition record for audit and replay. Optionally backed by
51/// a JSONL journal file for durability across process restarts (see
52/// [`StateStore::durable`]).
53pub struct StateStore {
54    state: Mutex<HashMap<String, Value>>,
55    transitions: Mutex<Vec<StateTransition>>,
56    /// Optional JSONL-backed durability layer. When set, every
57    /// `StateTransition` appended to the in-memory log is also
58    /// appended to this file's open writer; `reap_expired` rewrites
59    /// the file to compact away dropped keys.
60    journal: Mutex<Option<Journal>>,
61}
62
63struct Journal {
64    path: PathBuf,
65    writer: BufWriter<File>,
66}
67
68impl StateStore {
69    pub fn new() -> Self {
70        Self {
71            state: Mutex::new(HashMap::new()),
72            transitions: Mutex::new(Vec::new()),
73            journal: Mutex::new(None),
74        }
75    }
76
77    /// Open a durable, JSONL-backed StateStore. If the file exists,
78    /// its transitions are replayed (last-write-wins per key, with
79    /// TTLs honored) to rebuild current state. Subsequent writes
80    /// append to the same file.
81    ///
82    /// Returns an error only on filesystem-level failures (parent
83    /// directory missing, permission denied, etc.). Malformed lines
84    /// inside the journal are skipped with a warning rather than
85    /// failing the open — agent persistence shouldn't refuse to
86    /// start over a single bad line.
87    pub fn durable(path: impl Into<PathBuf>) -> std::io::Result<Self> {
88        let path = path.into();
89        if let Some(parent) = path.parent() {
90            if !parent.as_os_str().is_empty() {
91                std::fs::create_dir_all(parent)?;
92            }
93        }
94        let store = Self::new();
95        store.replay_journal(&path)?;
96        let file = OpenOptions::new().create(true).append(true).open(&path)?;
97        *store.journal.lock() = Some(Journal {
98            path,
99            writer: BufWriter::new(file),
100        });
101        Ok(store)
102    }
103
104    fn replay_journal(&self, path: &Path) -> std::io::Result<()> {
105        if !path.exists() {
106            return Ok(());
107        }
108        let file = File::open(path)?;
109        let reader = BufReader::new(file);
110        let now = Utc::now();
111        let mut state = self.state.lock();
112        let mut transitions = self.transitions.lock();
113        for line in reader.lines() {
114            let line = match line {
115                Ok(l) if l.trim().is_empty() => continue,
116                Ok(l) => l,
117                Err(_) => continue,
118            };
119            let Ok(t) = serde_json::from_str::<StateTransition>(&line) else {
120                // Malformed line. Don't refuse to boot over it.
121                tracing::warn!(
122                    journal = %path.display(),
123                    "skipping malformed StateStore journal line"
124                );
125                continue;
126            };
127            // Replay last-write-wins. TTLs that already expired are
128            // dropped at replay time so we don't surface stale data
129            // on first read.
130            if let (Some(ttl), Some(value)) = (t.ttl_secs, &t.new_value) {
131                if now.signed_duration_since(t.timestamp) > Duration::seconds(ttl as i64) {
132                    state.remove(&t.key);
133                } else {
134                    state.insert(t.key.clone(), value.clone());
135                }
136            } else if let Some(value) = &t.new_value {
137                state.insert(t.key.clone(), value.clone());
138            } else {
139                state.remove(&t.key);
140            }
141            transitions.push(t);
142        }
143        Ok(())
144    }
145
146    fn append_journal(&self, transition: &StateTransition) {
147        let mut journal = self.journal.lock();
148        let Some(journal) = journal.as_mut() else {
149            return;
150        };
151        // Best-effort: a failed disk write tracing::warn!s but the
152        // in-memory write already succeeded. Callers who need
153        // guaranteed durability should call `sync` after batches.
154        let Ok(json) = serde_json::to_string(transition) else {
155            return;
156        };
157        if let Err(e) = writeln!(journal.writer, "{json}") {
158            tracing::warn!(
159                journal = %journal.path.display(),
160                error = %e,
161                "StateStore journal append failed"
162            );
163            return;
164        }
165        let _ = journal.writer.flush();
166    }
167
168    /// Fsync the journal writer. Call after a batch of writes when
169    /// you need durability guarantees beyond best-effort flush.
170    pub fn sync(&self) -> std::io::Result<()> {
171        let mut journal = self.journal.lock();
172        let Some(journal) = journal.as_mut() else {
173            return Ok(());
174        };
175        journal.writer.flush()?;
176        journal.writer.get_ref().sync_all()
177    }
178
179    /// Drop expired keys (per `ttl_secs` on their last write) and
180    /// rewrite the journal as a compacted snapshot of the surviving
181    /// state. Returns the keys that were reaped.
182    ///
183    /// **TTL semantics**: a `ttl_secs` of 0 means "expired
184    /// immediately" — the key is reapable on the next call. There
185    /// is no "0 = forever" sentinel; use `set` (no TTL) for keys
186    /// that should never auto-expire.
187    ///
188    /// Latest-write-wins: a key rewritten WITHOUT a TTL after a
189    /// TTL'd write is NOT reaped — the more recent write
190    /// effectively cancels the TTL.
191    ///
192    /// Single-pass over the transitions log via a key→latest
193    /// index, so cost is O(n) in journal length (not O(n²)).
194    pub fn reap_expired(&self, now: DateTime<Utc>) -> std::io::Result<Vec<String>> {
195        let mut state = self.state.lock();
196        let mut transitions = self.transitions.lock();
197        // Build a single-pass index of the latest transition per
198        // key. Walking the whole log once is unavoidable; doing it
199        // ONCE keeps reap O(n) in journal length.
200        let mut latest_by_key: HashMap<&str, &StateTransition> = HashMap::new();
201        for t in transitions.iter() {
202            latest_by_key.insert(t.key.as_str(), t);
203        }
204        let expired: Vec<String> = latest_by_key
205            .iter()
206            .filter_map(|(_k, t)| {
207                let ttl = t.ttl_secs?;
208                if t.new_value.is_none() {
209                    return None;
210                }
211                let age = now.signed_duration_since(t.timestamp);
212                (age > Duration::seconds(ttl as i64)).then(|| t.key.clone())
213            })
214            .collect();
215        let mut reaped = Vec::new();
216        for key in expired {
217            if state.remove(&key).is_some() {
218                reaped.push(key.clone());
219                transitions.push(StateTransition {
220                    key,
221                    old_value: None,
222                    new_value: None,
223                    action_id: "reap".to_string(),
224                    timestamp: now,
225                    ttl_secs: None,
226                });
227            }
228        }
229        drop(state);
230        drop(transitions);
231        if !reaped.is_empty() {
232            self.compact_journal()?;
233        }
234        Ok(reaped)
235    }
236
237    /// Rewrite the journal as a flat snapshot of the current state —
238    /// one transition per surviving key, no replay history. Reduces
239    /// journal size without changing observable behavior.
240    ///
241    /// **Concurrency requirement**: callers MUST hold the
242    /// observation that no other thread is mid-`set`/`delete` on
243    /// this store; the snapshot is taken under the state lock, but
244    /// in-flight journal appends to the *old* file handle that
245    /// land between snapshot and rename are lost. Today's only
246    /// caller is `reap_expired`, which holds both locks across the
247    /// call; external callers should serialize themselves.
248    pub(crate) fn compact_journal(&self) -> std::io::Result<()> {
249        let mut journal = self.journal.lock();
250        let Some(j) = journal.as_mut() else {
251            return Ok(());
252        };
253        let state = self.state.lock().clone();
254        let tmp_path = j.path.with_extension("jsonl.tmp");
255        {
256            let tmp_file = File::create(&tmp_path)?;
257            let mut writer = BufWriter::new(tmp_file);
258            for (key, value) in &state {
259                let t = StateTransition {
260                    key: key.clone(),
261                    old_value: None,
262                    new_value: Some(value.clone()),
263                    action_id: "compact".to_string(),
264                    timestamp: Utc::now(),
265                    ttl_secs: None,
266                };
267                let line = serde_json::to_string(&t)?;
268                writeln!(writer, "{line}")?;
269            }
270            writer.flush()?;
271            writer.get_ref().sync_all()?;
272        }
273        std::fs::rename(&tmp_path, &j.path)?;
274        let file = OpenOptions::new().create(true).append(true).open(&j.path)?;
275        j.writer = BufWriter::new(file);
276        Ok(())
277    }
278
279    pub fn get(&self, key: &str) -> Option<Value> {
280        self.state.lock().get(key).cloned()
281    }
282
283    pub fn get_or(&self, key: &str, default: Value) -> Value {
284        self.state.lock().get(key).cloned().unwrap_or(default)
285    }
286
287    pub fn exists(&self, key: &str) -> bool {
288        self.state.lock().contains_key(key)
289    }
290
291    pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
292        self.set_inner(key, value, action_id, None)
293    }
294
295    /// Set a key with a TTL (seconds from now). `reap_expired`
296    /// drops the key once the deadline passes; re-setting the key
297    /// without a TTL (`set`) cancels the TTL.
298    ///
299    /// `ttl_secs == 0` means "expire immediately" (reapable on the
300    /// next `reap_expired` call). It is NOT a "no expiry" sentinel
301    /// — use the plain `set(...)` method for keys that should
302    /// never auto-expire. This differs from the Unix/Redis
303    /// convention; the distinction matters because a TTL passed
304    /// from untrusted input could otherwise silently mean
305    /// "forever" when the caller intended "never store."
306    pub fn set_with_ttl(
307        &self,
308        key: &str,
309        value: Value,
310        action_id: &str,
311        ttl_secs: u64,
312    ) -> StateTransition {
313        self.set_inner(key, value, action_id, Some(ttl_secs))
314    }
315
316    fn set_inner(
317        &self,
318        key: &str,
319        value: Value,
320        action_id: &str,
321        ttl_secs: Option<u64>,
322    ) -> StateTransition {
323        let mut state = self.state.lock();
324        let old = state.get(key).cloned();
325        state.insert(key.to_string(), value.clone());
326
327        let t = StateTransition {
328            key: key.to_string(),
329            old_value: old,
330            new_value: Some(value),
331            action_id: action_id.to_string(),
332            timestamp: Utc::now(),
333            ttl_secs,
334        };
335
336        self.transitions.lock().push(t.clone());
337        self.append_journal(&t);
338        t
339    }
340
341    pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
342        let mut state = self.state.lock();
343        let old = state.remove(key)?;
344
345        let t = StateTransition {
346            key: key.to_string(),
347            old_value: Some(old),
348            new_value: None,
349            action_id: action_id.to_string(),
350            timestamp: Utc::now(),
351            ttl_secs: None,
352        };
353
354        self.transitions.lock().push(t.clone());
355        self.append_journal(&t);
356        Some(t)
357    }
358
359    /// Deep clone of current state.
360    pub fn snapshot(&self) -> HashMap<String, Value> {
361        self.state.lock().clone()
362    }
363
364    /// Restore state from a snapshot, truncating transitions.
365    pub fn restore(&self, snapshot: HashMap<String, Value>, transition_count: usize) {
366        *self.state.lock() = snapshot;
367        self.transitions.lock().truncate(transition_count);
368    }
369
370    pub fn transition_count(&self) -> usize {
371        self.transitions.lock().len()
372    }
373
374    pub fn transitions(&self) -> Vec<StateTransition> {
375        self.transitions.lock().clone()
376    }
377
378    pub fn transitions_since(&self, index: usize) -> Vec<StateTransition> {
379        let transitions = self.transitions.lock();
380        let start = index.min(transitions.len());
381        transitions[start..].to_vec()
382    }
383
384    pub fn keys(&self) -> Vec<String> {
385        self.state.lock().keys().cloned().collect()
386    }
387
388    /// Replace the entire state map without recording transitions.
389    /// Used by checkpoint restore to avoid synthetic transition history.
390    /// Also clears the transitions log so callers of `transitions_since()`
391    /// don't see stale history from the discarded state.
392    pub fn replace_all(&self, snapshot: HashMap<String, Value>) {
393        *self.state.lock() = snapshot;
394        self.transitions.lock().clear();
395    }
396
397    /// Build a tenant-scoped view over this store
398    /// (Parslee-ai/car#187 phase 3 enforcement).
399    ///
400    /// All reads / writes go through `tenant:<tenant_id>:<key>` so
401    /// distinct tenants can't see each other's keys. `tenant = None`
402    /// returns a view that hits the unscoped (legacy) namespace —
403    /// callers that don't yet have a `RuntimeScope` get pre-#187
404    /// behaviour automatically.
405    ///
406    /// Cheap to construct; holds a `&self` borrow plus the tenant
407    /// string. The view's methods take the parking-lot lock the same
408    /// way the unscoped methods do.
409    pub fn scoped<'a>(&'a self, tenant: Option<&'a str>) -> ScopedStateView<'a> {
410        ScopedStateView {
411            store: self,
412            tenant,
413        }
414    }
415}
416
417/// Tenant-scoped view over a [`StateStore`]. All key arguments are
418/// transparently prefixed with `tenant:<tenant_id>:` before hitting
419/// the underlying store; on the way out, the prefix is stripped so
420/// callers see their original keys.
421///
422/// Construct via [`StateStore::scoped`]. When `tenant` is `None`,
423/// the prefix is empty and the view is functionally equivalent to
424/// the unscoped methods on `StateStore` — useful for code paths
425/// that always go through this view regardless of whether scope is
426/// active.
427///
428/// # Isolation guarantee
429///
430/// Two views with distinct `tenant` strings cannot observe each
431/// other's writes through `get` / `exists` / `keys`. The transitions
432/// log still records the full (prefixed) key so audit / replay sees
433/// the actual storage layout.
434///
435/// # What isolation does *not* cover (phase 3 follow-ups)
436///
437/// - `StateStore::snapshot` / `restore` are deliberately unscoped —
438///   they're called at proposal boundaries for rollback and need to
439///   see the whole map. Per-tenant partial rollback is a known
440///   concurrency hole when multiple proposals run interleaved; the
441///   pre-#187 baseline has the same issue, and fixing it cleanly
442///   requires either serializing per-tenant or extending the
443///   transactional model. Tracked as a follow-up.
444/// - The journal file (when durability is on) records full
445///   prefixed keys. Operators rotating tenants out can grep the
446///   journal by prefix.
447pub struct ScopedStateView<'a> {
448    store: &'a StateStore,
449    tenant: Option<&'a str>,
450}
451
452impl<'a> ScopedStateView<'a> {
453    fn full_key(&self, key: &str) -> String {
454        match self.tenant {
455            Some(t) if !t.is_empty() => format!("tenant:{t}:{key}"),
456            _ => key.to_string(),
457        }
458    }
459
460    fn strip_prefix<'k>(&self, full: &'k str) -> Option<&'k str> {
461        match self.tenant {
462            Some(t) if !t.is_empty() => {
463                let prefix = format!("tenant:{t}:");
464                full.strip_prefix(&prefix)
465            }
466            _ => Some(full),
467        }
468    }
469
470    pub fn get(&self, key: &str) -> Option<Value> {
471        self.store.get(&self.full_key(key))
472    }
473
474    pub fn get_or(&self, key: &str, default: Value) -> Value {
475        self.store.get_or(&self.full_key(key), default)
476    }
477
478    pub fn exists(&self, key: &str) -> bool {
479        self.store.exists(&self.full_key(key))
480    }
481
482    pub fn set(&self, key: &str, value: Value, action_id: &str) -> StateTransition {
483        self.store.set(&self.full_key(key), value, action_id)
484    }
485
486    pub fn set_with_ttl(
487        &self,
488        key: &str,
489        value: Value,
490        action_id: &str,
491        ttl_secs: u64,
492    ) -> StateTransition {
493        self.store
494            .set_with_ttl(&self.full_key(key), value, action_id, ttl_secs)
495    }
496
497    pub fn delete(&self, key: &str, action_id: &str) -> Option<StateTransition> {
498        self.store.delete(&self.full_key(key), action_id)
499    }
500
501    /// Return keys belonging to this tenant only, with the
502    /// `tenant:<id>:` prefix stripped so callers see their original
503    /// key names. Unscoped views (no tenant) return only keys that
504    /// don't start with `tenant:` — preventing accidental visibility
505    /// of scoped state through a legacy code path.
506    pub fn keys(&self) -> Vec<String> {
507        self.store
508            .keys()
509            .into_iter()
510            .filter_map(|k| {
511                if self.tenant.map(|t| !t.is_empty()).unwrap_or(false) {
512                    self.strip_prefix(&k).map(str::to_string)
513                } else if k.starts_with("tenant:") {
514                    None
515                } else {
516                    Some(k)
517                }
518            })
519            .collect()
520    }
521}
522
523impl Default for StateStore {
524    fn default() -> Self {
525        Self::new()
526    }
527}
528
529impl car_ir::precondition::StateView for StateStore {
530    fn get_value(&self, key: &str) -> Option<Value> {
531        self.get(key)
532    }
533    fn key_exists(&self, key: &str) -> bool {
534        self.exists(key)
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use serde_json::json;
542
543    #[test]
544    fn set_and_get() {
545        let store = StateStore::new();
546        store.set("x", Value::from(42), "test");
547        assert_eq!(store.get("x"), Some(Value::from(42)));
548    }
549
550    #[test]
551    fn exists() {
552        let store = StateStore::new();
553        assert!(!store.exists("x"));
554        store.set("x", Value::from(1), "test");
555        assert!(store.exists("x"));
556    }
557
558    #[test]
559    fn delete() {
560        let store = StateStore::new();
561        store.set("x", Value::from(1), "test");
562        let t = store.delete("x", "test");
563        assert!(t.is_some());
564        assert!(!store.exists("x"));
565    }
566
567    #[test]
568    fn delete_nonexistent() {
569        let store = StateStore::new();
570        assert!(store.delete("x", "test").is_none());
571    }
572
573    #[test]
574    fn snapshot_and_restore() {
575        let store = StateStore::new();
576        store.set("x", Value::from(1), "a");
577        let snap = store.snapshot();
578        let tc = store.transition_count();
579
580        store.set("y", Value::from(2), "b");
581        assert!(store.exists("y"));
582
583        store.restore(snap, tc);
584        assert!(store.exists("x"));
585        assert!(!store.exists("y"));
586        assert_eq!(store.transition_count(), 1);
587    }
588
589    #[test]
590    fn transitions_logged() {
591        let store = StateStore::new();
592        store.set("a", Value::from(1), "act1");
593        store.set("b", Value::from(2), "act2");
594
595        let transitions = store.transitions();
596        assert_eq!(transitions.len(), 2);
597        assert_eq!(transitions[0].key, "a");
598        assert_eq!(transitions[1].key, "b");
599    }
600
601    #[test]
602    fn transitions_since() {
603        let store = StateStore::new();
604        store.set("a", Value::from(1), "act1");
605        let idx = store.transition_count();
606        store.set("b", Value::from(2), "act2");
607
608        let since = store.transitions_since(idx);
609        assert_eq!(since.len(), 1);
610        assert_eq!(since[0].key, "b");
611    }
612
613    #[test]
614    fn transition_records_old_value() {
615        let store = StateStore::new();
616        store.set("x", Value::from(1), "first");
617        store.set("x", Value::from(2), "second");
618
619        let transitions = store.transitions();
620        assert_eq!(transitions[1].old_value, Some(Value::from(1)));
621        assert_eq!(transitions[1].new_value, Some(Value::from(2)));
622    }
623
624    #[test]
625    fn keys() {
626        let store = StateStore::new();
627        store.set("a", Value::from(1), "t");
628        store.set("b", Value::from(2), "t");
629        let mut keys = store.keys();
630        keys.sort();
631        assert_eq!(keys, vec!["a", "b"]);
632    }
633
634    #[test]
635    fn transitions_since_after_restore_does_not_panic() {
636        let store = StateStore::new();
637        store.set("a", serde_json::json!(1), "test");
638        store.set("b", serde_json::json!(2), "test");
639        let count_before = store.transition_count(); // 2
640
641        // Restore to empty, truncating transitions to 0
642        store.restore(HashMap::new(), 0);
643
644        // Using the stale count_before (2) should not panic
645        let result = store.transitions_since(count_before);
646        assert!(result.is_empty());
647    }
648
649    #[test]
650    fn transitions_since_normal_usage() {
651        let store = StateStore::new();
652        store.set("a", serde_json::json!(1), "test");
653        let mark = store.transition_count();
654        store.set("b", serde_json::json!(2), "test");
655        let since = store.transitions_since(mark);
656        assert_eq!(since.len(), 1);
657        assert_eq!(since[0].key, "b");
658    }
659
660    #[test]
661    fn replace_all_swaps_state_without_transitions() {
662        let store = StateStore::new();
663        store.set("old_key", serde_json::json!("old"), "setup");
664
665        let mut new_state = HashMap::new();
666        new_state.insert("new_key".to_string(), serde_json::json!("new"));
667        store.replace_all(new_state);
668
669        assert_eq!(store.get("new_key"), Some(serde_json::json!("new")));
670        assert_eq!(store.get("old_key"), None);
671        // After replace_all, transitions should be cleared (not preserved)
672        assert_eq!(store.transition_count(), 0);
673    }
674
675    #[test]
676    fn durable_store_survives_reopen() {
677        let dir = tempfile::tempdir().unwrap();
678        let path = dir.path().join("state.jsonl");
679        {
680            let store = StateStore::durable(&path).unwrap();
681            store.set("agent", serde_json::json!("planner"), "boot");
682            store.set("turns", serde_json::json!(42), "tick");
683            store.sync().unwrap();
684        }
685        let store = StateStore::durable(&path).unwrap();
686        assert_eq!(store.get("agent"), Some(serde_json::json!("planner")));
687        assert_eq!(store.get("turns"), Some(serde_json::json!(42)));
688    }
689
690    #[test]
691    fn durable_store_replays_deletes() {
692        let dir = tempfile::tempdir().unwrap();
693        let path = dir.path().join("state.jsonl");
694        {
695            let store = StateStore::durable(&path).unwrap();
696            store.set("transient", serde_json::json!("x"), "boot");
697            store.delete("transient", "rm");
698            store.sync().unwrap();
699        }
700        let store = StateStore::durable(&path).unwrap();
701        assert!(!store.exists("transient"));
702    }
703
704    #[test]
705    fn ttl_reap_drops_expired_and_keeps_fresh() {
706        let store = StateStore::new();
707        store.set_with_ttl("short", serde_json::json!(1), "set", 0);
708        store.set_with_ttl("long", serde_json::json!(2), "set", 3600);
709        store.set("forever", serde_json::json!(3), "set");
710        // Now + 10s — short (ttl=0) is expired, long (ttl=3600) is fresh, forever has no TTL.
711        let reaped = store
712            .reap_expired(Utc::now() + Duration::seconds(10))
713            .unwrap();
714        assert_eq!(reaped, vec!["short".to_string()]);
715        assert!(!store.exists("short"));
716        assert_eq!(store.get("long"), Some(serde_json::json!(2)));
717        assert_eq!(store.get("forever"), Some(serde_json::json!(3)));
718    }
719
720    #[test]
721    fn durable_ttl_compacts_journal() {
722        let dir = tempfile::tempdir().unwrap();
723        let path = dir.path().join("state.jsonl");
724        {
725            let store = StateStore::durable(&path).unwrap();
726            for i in 0..50 {
727                store.set_with_ttl(&format!("k{i}"), serde_json::json!(i), "set", 0);
728            }
729            store.set("survivor", serde_json::json!("kept"), "set");
730            store.sync().unwrap();
731            let pre = std::fs::metadata(&path).unwrap().len();
732            // Force expiry by advancing the clock past the 0s TTL.
733            let reaped = store
734                .reap_expired(Utc::now() + Duration::seconds(1))
735                .unwrap();
736            assert_eq!(reaped.len(), 50);
737            store.sync().unwrap();
738            let post = std::fs::metadata(&path).unwrap().len();
739            // Compaction should shrink the journal: 50 TTL'd writes + 1
740            // survivor pre-compact is 51 lines; post-compact is 1 line.
741            assert!(
742                post < pre,
743                "post={post} pre={pre} — compaction did not shrink"
744            );
745        }
746        // Reopen — only the survivor remains.
747        let store = StateStore::durable(&path).unwrap();
748        assert!(!store.exists("k0"));
749        assert!(!store.exists("k49"));
750        assert_eq!(store.get("survivor"), Some(serde_json::json!("kept")));
751    }
752
753    #[test]
754    fn ttl_then_rewrite_without_ttl_does_not_reap() {
755        let store = StateStore::new();
756        store.set_with_ttl("k", serde_json::json!("a"), "first", 0);
757        store.set("k", serde_json::json!("b"), "second"); // no TTL
758        let reaped = store
759            .reap_expired(Utc::now() + Duration::seconds(10))
760            .unwrap();
761        assert!(reaped.is_empty());
762        assert_eq!(store.get("k"), Some(serde_json::json!("b")));
763    }
764
765    #[test]
766    fn malformed_journal_line_is_skipped_not_fatal() {
767        let dir = tempfile::tempdir().unwrap();
768        let path = dir.path().join("state.jsonl");
769        // Plant a good line + a bad line + another good line.
770        {
771            std::fs::write(
772                &path,
773                "{\"key\":\"a\",\"old_value\":null,\"new_value\":1,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n\
774                 not-json\n\
775                 {\"key\":\"b\",\"old_value\":null,\"new_value\":2,\"action_id\":\"x\",\"timestamp\":\"2026-05-11T00:00:00Z\"}\n",
776            )
777            .unwrap();
778        }
779        let store = StateStore::durable(&path).unwrap();
780        assert_eq!(store.get("a"), Some(serde_json::json!(1)));
781        assert_eq!(store.get("b"), Some(serde_json::json!(2)));
782    }
783
784    // ScopedStateView tests — Parslee-ai/car#187 phase 3 enforcement.
785
786    #[test]
787    fn scoped_view_writes_isolate_between_tenants() {
788        let store = StateStore::new();
789        store.scoped(Some("acme")).set("config", json!("A"), "act");
790        store
791            .scoped(Some("globex"))
792            .set("config", json!("G"), "act");
793
794        // Each tenant sees their own value.
795        assert_eq!(store.scoped(Some("acme")).get("config"), Some(json!("A")));
796        assert_eq!(store.scoped(Some("globex")).get("config"), Some(json!("G")));
797    }
798
799    #[test]
800    fn scoped_view_isolates_existence_check() {
801        let store = StateStore::new();
802        store.scoped(Some("acme")).set("k", json!(1), "act");
803        assert!(store.scoped(Some("acme")).exists("k"));
804        assert!(!store.scoped(Some("globex")).exists("k"));
805    }
806
807    #[test]
808    fn scoped_view_keys_filters_to_tenant() {
809        let store = StateStore::new();
810        store.scoped(Some("acme")).set("a", json!(1), "act");
811        store.scoped(Some("acme")).set("b", json!(2), "act");
812        store.scoped(Some("globex")).set("g", json!(9), "act");
813        store.set("unscoped", json!(0), "act");
814
815        let mut acme_keys = store.scoped(Some("acme")).keys();
816        acme_keys.sort();
817        assert_eq!(acme_keys, vec!["a", "b"]);
818
819        let globex_keys = store.scoped(Some("globex")).keys();
820        assert_eq!(globex_keys, vec!["g"]);
821    }
822
823    #[test]
824    fn unscoped_view_skips_tenant_prefixed_keys() {
825        // Calling scoped(None) — the legacy-compat path — must NOT
826        // accidentally expose other tenants' keys via `keys()`. This
827        // is the inverse of the isolation contract: the unscoped
828        // namespace shouldn't see scoped data even though it's all
829        // in the same backing HashMap.
830        let store = StateStore::new();
831        store.set("legacy", json!("ok"), "act");
832        store.scoped(Some("acme")).set("hidden", json!(42), "act");
833
834        let unscoped = store.scoped(None).keys();
835        assert_eq!(unscoped, vec!["legacy"]);
836        assert!(store.scoped(None).get("hidden").is_none());
837    }
838
839    #[test]
840    fn scoped_view_delete_doesnt_touch_other_tenants() {
841        let store = StateStore::new();
842        store.scoped(Some("acme")).set("shared", json!(1), "act");
843        store.scoped(Some("globex")).set("shared", json!(2), "act");
844
845        store.scoped(Some("acme")).delete("shared", "act");
846        assert!(!store.scoped(Some("acme")).exists("shared"));
847        assert!(store.scoped(Some("globex")).exists("shared"));
848    }
849
850    #[test]
851    fn empty_tenant_string_treated_as_unscoped() {
852        // Some(""): defensive — RuntimeScope normalizes empty strings
853        // to None at the dispatcher, but the view shouldn't trip if
854        // a caller passes an empty tenant by mistake.
855        let store = StateStore::new();
856        store.scoped(Some("")).set("k", json!(1), "act");
857        assert_eq!(store.get("k"), Some(json!(1)));
858        assert_eq!(store.scoped(None).get("k"), Some(json!(1)));
859    }
860}