1use std::collections::BTreeMap;
4use std::sync::{Arc, Mutex, MutexGuard};
5
6use sim_kernel::{ContentId, Error, EvalReply, HandleId, Ref, Result, effect_ledger::EffectLedger};
7
8use crate::content_key::ContentKey;
9
10pub trait EvalCassetteLedger: Send + Sync {
17 fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()>;
19
20 fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>>;
22}
23
24#[derive(Default)]
30pub struct EffectLedgerCassette {
31 ledger: Mutex<EffectLedger>,
32 entries: Mutex<Vec<(ContentKey, EvalReply)>>,
33}
34
35impl EffectLedgerCassette {
36 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn with_ledger(ledger: EffectLedger) -> Self {
43 Self {
44 ledger: Mutex::new(ledger),
45 entries: Mutex::new(Vec::new()),
46 }
47 }
48
49 fn cassette_result(&self, key: &ContentKey) -> Result<Option<Ref>> {
50 let key_id = content_key_id(key)?;
51 Ok(lock(&self.ledger, "effect ledger")?
52 .cassette_result(&key_id)
53 .cloned())
54 }
55}
56
57impl EvalCassetteLedger for EffectLedgerCassette {
58 fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
59 if self.cassette_result(key)?.is_some() {
60 return Ok(());
61 }
62
63 let key_id = content_key_id(key)?;
64 let result_ref = Ref::Handle(HandleId::fresh());
65 lock(&self.ledger, "effect ledger")?.insert_cassette_result(key_id, result_ref);
66 lock(&self.entries, "cassette entries")?.push((key.clone(), reply.clone()));
67 Ok(())
68 }
69
70 fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
71 Ok(lock(&self.entries, "cassette entries")?.clone())
72 }
73}
74
75pub struct EvalCassette {
82 ledger: Arc<dyn EvalCassetteLedger>,
83 cache: Mutex<BTreeMap<ContentKey, EvalReply>>,
84}
85
86impl EvalCassette {
87 pub fn new(ledger: Arc<dyn EvalCassetteLedger>) -> Self {
89 Self {
90 ledger,
91 cache: Mutex::new(BTreeMap::new()),
92 }
93 }
94
95 pub fn from_ledger(ledger: Arc<dyn EvalCassetteLedger>) -> Result<Self> {
97 let mut cache = BTreeMap::new();
98 for (key, reply) in ledger.replay_eval_results()? {
99 cache.insert(key, reply);
100 }
101 Ok(Self {
102 ledger,
103 cache: Mutex::new(cache),
104 })
105 }
106
107 pub fn get(&self, key: &ContentKey) -> Option<EvalReply> {
109 self.cache.lock().ok()?.get(key).cloned()
110 }
111
112 pub fn record(&self, key: ContentKey, reply: EvalReply) -> Result<()> {
114 let mut cache = lock(&self.cache, "eval cassette cache")?;
115 if cache.contains_key(&key) {
116 return Ok(());
117 }
118 self.ledger.append_eval_result(&key, &reply)?;
119 cache.insert(key, reply);
120 Ok(())
121 }
122
123 pub fn len(&self) -> usize {
125 self.cache.lock().map_or(0, |cache| cache.len())
126 }
127
128 pub fn is_empty(&self) -> bool {
130 self.len() == 0
131 }
132}
133
134fn content_key_id(key: &ContentKey) -> Result<ContentId> {
135 key.datum().content_id()
136}
137
138fn lock<'a, T>(mutex: &'a Mutex<T>, name: &str) -> Result<MutexGuard<'a, T>> {
139 mutex
140 .lock()
141 .map_err(|_| Error::Eval(format!("{name} mutex poisoned")))
142}
143
144#[cfg(test)]
145mod tests {
146 use std::sync::Mutex;
147
148 use sim_kernel::{
149 CapabilityName, Consistency, Cx, EvalMode, EvalRequest, Expr, Value, testing::bare_cx as cx,
150 };
151
152 use super::*;
153
154 #[derive(Default)]
155 struct MemoryLedger {
156 entries: Mutex<Vec<(ContentKey, EvalReply)>>,
157 writes: Mutex<usize>,
158 }
159
160 impl MemoryLedger {
161 fn push_existing(&self, key: ContentKey, reply: EvalReply) {
162 self.entries.lock().unwrap().push((key, reply));
163 }
164
165 fn writes(&self) -> usize {
166 *self.writes.lock().unwrap()
167 }
168 }
169
170 impl EvalCassetteLedger for MemoryLedger {
171 fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
172 *self.writes.lock().unwrap() += 1;
173 self.entries
174 .lock()
175 .unwrap()
176 .push((key.clone(), reply.clone()));
177 Ok(())
178 }
179
180 fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
181 Ok(self.entries.lock().unwrap().clone())
182 }
183 }
184
185 fn request(expr: &str) -> EvalRequest {
186 EvalRequest {
187 expr: Expr::String(expr.to_owned()),
188 result_shape: None,
189 required_capabilities: vec![CapabilityName::new("fabric.test")],
190 deadline: None,
191 consistency: Consistency::LocalFirst,
192 mode: EvalMode::Eval,
193 answer_limit: None,
194 stream_buffer: None,
195 stream: false,
196 trace: false,
197 }
198 }
199
200 fn key(expr: &str) -> ContentKey {
201 ContentKey::from_request(&request(expr))
202 }
203
204 fn reply(cx: &mut Cx, value: &str) -> EvalReply {
205 EvalReply {
206 value: cx.factory().string(value.to_owned()).unwrap(),
207 diagnostics: Vec::new(),
208 trace: None,
209 }
210 }
211
212 fn value_display(cx: &mut Cx, value: &Value) -> String {
213 value.object().display(cx).unwrap()
214 }
215
216 #[test]
217 fn record_and_get_returns_stored_reply() {
218 let mut cx = cx();
219 let ledger = Arc::new(MemoryLedger::default());
220 let cassette = EvalCassette::new(ledger);
221 let key = key("record");
222 let reply = reply(&mut cx, "stored");
223
224 cassette.record(key.clone(), reply.clone()).unwrap();
225
226 let stored = cassette.get(&key).unwrap();
227 assert_eq!(
228 value_display(&mut cx, &stored.value),
229 value_display(&mut cx, &reply.value)
230 );
231 }
232
233 #[test]
234 fn duplicate_record_is_idempotent() {
235 let mut cx = cx();
236 let ledger = Arc::new(MemoryLedger::default());
237 let cassette = EvalCassette::new(ledger.clone());
238 let key = key("duplicate");
239 let first = reply(&mut cx, "first");
240
241 cassette.record(key.clone(), first.clone()).unwrap();
242 cassette
243 .record(key.clone(), reply(&mut cx, "second"))
244 .unwrap();
245
246 assert_eq!(ledger.writes(), 1);
247 let stored = cassette.get(&key).unwrap();
248 assert_eq!(
249 value_display(&mut cx, &stored.value),
250 value_display(&mut cx, &first.value)
251 );
252 }
253
254 #[test]
255 fn from_ledger_replays_prior_entries() {
256 let mut cx = cx();
257 let ledger = Arc::new(MemoryLedger::default());
258 let first_key = key("first");
259 let second_key = key("second");
260 let first = reply(&mut cx, "one");
261 let second = reply(&mut cx, "two");
262 ledger.push_existing(first_key.clone(), first.clone());
263 ledger.push_existing(second_key.clone(), second.clone());
264
265 let cassette = EvalCassette::from_ledger(ledger).unwrap();
266
267 assert_eq!(cassette.len(), 2);
268 assert_eq!(
269 value_display(&mut cx, &cassette.get(&first_key).unwrap().value),
270 value_display(&mut cx, &first.value)
271 );
272 assert_eq!(
273 value_display(&mut cx, &cassette.get(&second_key).unwrap().value),
274 value_display(&mut cx, &second.value)
275 );
276 }
277}