Skip to main content

sim_lib_stream_fabric/
cassette.rs

1//! Ledger-backed storage for content-addressed eval replies.
2
3use std::collections::BTreeMap;
4use std::sync::{Arc, Mutex, MutexGuard};
5
6use sim_kernel::{ContentId, Error, EvalReply, HandleId, Ref, Result, effect_ledger::EffectLedger};
7
8use crate::content_key::ContentKey;
9
10/// Append and replay surface used by [`EvalCassette`].
11///
12/// The production adapter is [`EffectLedgerCassette`], which writes every
13/// recorded key to the kernel [`EffectLedger`] cassette table. The trait keeps
14/// `EvalCassette` independent of a concrete persistence backend while retaining
15/// fail-closed write semantics.
16pub trait EvalCassetteLedger: Send + Sync {
17    /// Appends one content key and reply pair.
18    fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()>;
19
20    /// Replays the ledger into key and reply pairs.
21    fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>>;
22}
23
24/// A kernel [`EffectLedger`] adapter for [`EvalCassette`].
25///
26/// The kernel ledger stores replay hints as `ContentId -> Ref`. This adapter
27/// records that hint in the ledger and keeps the reply objects alongside it so a
28/// cassette created from the adapter can rebuild its hot-path map immediately.
29#[derive(Default)]
30pub struct EffectLedgerCassette {
31    ledger: Mutex<EffectLedger>,
32    entries: Mutex<Vec<(ContentKey, EvalReply)>>,
33}
34
35impl EffectLedgerCassette {
36    /// Creates an empty adapter backed by a fresh [`EffectLedger`].
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Creates an adapter around an existing [`EffectLedger`].
42    pub fn with_ledger(ledger: EffectLedger) -> Self {
43        Self {
44            ledger: Mutex::new(ledger),
45            entries: Mutex::new(Vec::new()),
46        }
47    }
48
49    fn cassette_result(&self, key: &ContentKey) -> Result<Option<Ref>> {
50        let key_id = content_key_id(key)?;
51        Ok(lock(&self.ledger, "effect ledger")?
52            .cassette_result(&key_id)
53            .cloned())
54    }
55}
56
57impl EvalCassetteLedger for EffectLedgerCassette {
58    fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
59        if self.cassette_result(key)?.is_some() {
60            return Ok(());
61        }
62
63        let key_id = content_key_id(key)?;
64        let result_ref = Ref::Handle(HandleId::fresh());
65        lock(&self.ledger, "effect ledger")?.insert_cassette_result(key_id, result_ref);
66        lock(&self.entries, "cassette entries")?.push((key.clone(), reply.clone()));
67        Ok(())
68    }
69
70    fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
71        Ok(lock(&self.entries, "cassette entries")?.clone())
72    }
73}
74
75/// An [`EffectLedger`]-backed cache of content-addressed eval replies.
76///
77/// The hot path is an in-memory [`BTreeMap`]. Recording a new reply writes to
78/// the configured ledger first, then updates the in-memory map. Constructing a
79/// cassette with [`EvalCassette::from_ledger`] replays ledger entries into a
80/// fresh map.
81pub struct EvalCassette {
82    ledger: Arc<dyn EvalCassetteLedger>,
83    cache: Mutex<BTreeMap<ContentKey, EvalReply>>,
84}
85
86impl EvalCassette {
87    /// Creates an empty cassette backed by `ledger`.
88    pub fn new(ledger: Arc<dyn EvalCassetteLedger>) -> Self {
89        Self {
90            ledger,
91            cache: Mutex::new(BTreeMap::new()),
92        }
93    }
94
95    /// Replays `ledger` into a fresh in-memory map.
96    pub fn from_ledger(ledger: Arc<dyn EvalCassetteLedger>) -> Result<Self> {
97        let mut cache = BTreeMap::new();
98        for (key, reply) in ledger.replay_eval_results()? {
99            cache.insert(key, reply);
100        }
101        Ok(Self {
102            ledger,
103            cache: Mutex::new(cache),
104        })
105    }
106
107    /// Returns a cached reply for `key`.
108    pub fn get(&self, key: &ContentKey) -> Option<EvalReply> {
109        self.cache.lock().ok()?.get(key).cloned()
110    }
111
112    /// Records `reply` under `key`, writing the ledger before updating cache.
113    pub fn record(&self, key: ContentKey, reply: EvalReply) -> Result<()> {
114        let mut cache = lock(&self.cache, "eval cassette cache")?;
115        if cache.contains_key(&key) {
116            return Ok(());
117        }
118        self.ledger.append_eval_result(&key, &reply)?;
119        cache.insert(key, reply);
120        Ok(())
121    }
122
123    /// Returns the number of cached replies.
124    pub fn len(&self) -> usize {
125        self.cache.lock().map_or(0, |cache| cache.len())
126    }
127
128    /// Returns whether the cassette has no cached replies.
129    pub fn is_empty(&self) -> bool {
130        self.len() == 0
131    }
132}
133
134fn content_key_id(key: &ContentKey) -> Result<ContentId> {
135    key.datum().content_id()
136}
137
138fn lock<'a, T>(mutex: &'a Mutex<T>, name: &str) -> Result<MutexGuard<'a, T>> {
139    mutex
140        .lock()
141        .map_err(|_| Error::Eval(format!("{name} mutex poisoned")))
142}
143
144#[cfg(test)]
145mod tests {
146    use std::sync::Mutex;
147
148    use sim_kernel::{
149        CapabilityName, Consistency, Cx, EvalMode, EvalRequest, Expr, Value, testing::bare_cx as cx,
150    };
151
152    use super::*;
153
154    #[derive(Default)]
155    struct MemoryLedger {
156        entries: Mutex<Vec<(ContentKey, EvalReply)>>,
157        writes: Mutex<usize>,
158    }
159
160    impl MemoryLedger {
161        fn push_existing(&self, key: ContentKey, reply: EvalReply) {
162            self.entries.lock().unwrap().push((key, reply));
163        }
164
165        fn writes(&self) -> usize {
166            *self.writes.lock().unwrap()
167        }
168    }
169
170    impl EvalCassetteLedger for MemoryLedger {
171        fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
172            *self.writes.lock().unwrap() += 1;
173            self.entries
174                .lock()
175                .unwrap()
176                .push((key.clone(), reply.clone()));
177            Ok(())
178        }
179
180        fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
181            Ok(self.entries.lock().unwrap().clone())
182        }
183    }
184
185    fn request(expr: &str) -> EvalRequest {
186        EvalRequest {
187            expr: Expr::String(expr.to_owned()),
188            result_shape: None,
189            required_capabilities: vec![CapabilityName::new("fabric.test")],
190            deadline: None,
191            consistency: Consistency::LocalFirst,
192            mode: EvalMode::Eval,
193            answer_limit: None,
194            stream_buffer: None,
195            stream: false,
196            trace: false,
197        }
198    }
199
200    fn key(expr: &str) -> ContentKey {
201        ContentKey::from_request(&request(expr))
202    }
203
204    fn reply(cx: &mut Cx, value: &str) -> EvalReply {
205        EvalReply {
206            value: cx.factory().string(value.to_owned()).unwrap(),
207            diagnostics: Vec::new(),
208            trace: None,
209        }
210    }
211
212    fn value_display(cx: &mut Cx, value: &Value) -> String {
213        value.object().display(cx).unwrap()
214    }
215
216    #[test]
217    fn record_and_get_returns_stored_reply() {
218        let mut cx = cx();
219        let ledger = Arc::new(MemoryLedger::default());
220        let cassette = EvalCassette::new(ledger);
221        let key = key("record");
222        let reply = reply(&mut cx, "stored");
223
224        cassette.record(key.clone(), reply.clone()).unwrap();
225
226        let stored = cassette.get(&key).unwrap();
227        assert_eq!(
228            value_display(&mut cx, &stored.value),
229            value_display(&mut cx, &reply.value)
230        );
231    }
232
233    #[test]
234    fn duplicate_record_is_idempotent() {
235        let mut cx = cx();
236        let ledger = Arc::new(MemoryLedger::default());
237        let cassette = EvalCassette::new(ledger.clone());
238        let key = key("duplicate");
239        let first = reply(&mut cx, "first");
240
241        cassette.record(key.clone(), first.clone()).unwrap();
242        cassette
243            .record(key.clone(), reply(&mut cx, "second"))
244            .unwrap();
245
246        assert_eq!(ledger.writes(), 1);
247        let stored = cassette.get(&key).unwrap();
248        assert_eq!(
249            value_display(&mut cx, &stored.value),
250            value_display(&mut cx, &first.value)
251        );
252    }
253
254    #[test]
255    fn from_ledger_replays_prior_entries() {
256        let mut cx = cx();
257        let ledger = Arc::new(MemoryLedger::default());
258        let first_key = key("first");
259        let second_key = key("second");
260        let first = reply(&mut cx, "one");
261        let second = reply(&mut cx, "two");
262        ledger.push_existing(first_key.clone(), first.clone());
263        ledger.push_existing(second_key.clone(), second.clone());
264
265        let cassette = EvalCassette::from_ledger(ledger).unwrap();
266
267        assert_eq!(cassette.len(), 2);
268        assert_eq!(
269            value_display(&mut cx, &cassette.get(&first_key).unwrap().value),
270            value_display(&mut cx, &first.value)
271        );
272        assert_eq!(
273            value_display(&mut cx, &cassette.get(&second_key).unwrap().value),
274            value_display(&mut cx, &second.value)
275        );
276    }
277}