use std::collections::BTreeMap;
use std::sync::{Arc, Mutex, MutexGuard};
use sim_kernel::{ContentId, Error, EvalReply, HandleId, Ref, Result, effect_ledger::EffectLedger};
use crate::content_key::ContentKey;
pub trait EvalCassetteLedger: Send + Sync {
fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()>;
fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>>;
}
#[derive(Default)]
pub struct EffectLedgerCassette {
ledger: Mutex<EffectLedger>,
entries: Mutex<Vec<(ContentKey, EvalReply)>>,
}
impl EffectLedgerCassette {
pub fn new() -> Self {
Self::default()
}
pub fn with_ledger(ledger: EffectLedger) -> Self {
Self {
ledger: Mutex::new(ledger),
entries: Mutex::new(Vec::new()),
}
}
fn cassette_result(&self, key: &ContentKey) -> Result<Option<Ref>> {
let key_id = content_key_id(key)?;
Ok(lock(&self.ledger, "effect ledger")?
.cassette_result(&key_id)
.cloned())
}
}
impl EvalCassetteLedger for EffectLedgerCassette {
fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
if self.cassette_result(key)?.is_some() {
return Ok(());
}
let key_id = content_key_id(key)?;
let result_ref = Ref::Handle(HandleId::fresh());
lock(&self.ledger, "effect ledger")?.insert_cassette_result(key_id, result_ref);
lock(&self.entries, "cassette entries")?.push((key.clone(), reply.clone()));
Ok(())
}
fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
Ok(lock(&self.entries, "cassette entries")?.clone())
}
}
pub struct EvalCassette {
ledger: Arc<dyn EvalCassetteLedger>,
cache: Mutex<BTreeMap<ContentKey, EvalReply>>,
}
impl EvalCassette {
pub fn new(ledger: Arc<dyn EvalCassetteLedger>) -> Self {
Self {
ledger,
cache: Mutex::new(BTreeMap::new()),
}
}
pub fn from_ledger(ledger: Arc<dyn EvalCassetteLedger>) -> Result<Self> {
let mut cache = BTreeMap::new();
for (key, reply) in ledger.replay_eval_results()? {
cache.insert(key, reply);
}
Ok(Self {
ledger,
cache: Mutex::new(cache),
})
}
pub fn get(&self, key: &ContentKey) -> Option<EvalReply> {
self.cache.lock().ok()?.get(key).cloned()
}
pub fn record(&self, key: ContentKey, reply: EvalReply) -> Result<()> {
let mut cache = lock(&self.cache, "eval cassette cache")?;
if cache.contains_key(&key) {
return Ok(());
}
self.ledger.append_eval_result(&key, &reply)?;
cache.insert(key, reply);
Ok(())
}
pub fn len(&self) -> usize {
self.cache.lock().map_or(0, |cache| cache.len())
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
fn content_key_id(key: &ContentKey) -> Result<ContentId> {
key.datum().content_id()
}
fn lock<'a, T>(mutex: &'a Mutex<T>, name: &str) -> Result<MutexGuard<'a, T>> {
mutex
.lock()
.map_err(|_| Error::Eval(format!("{name} mutex poisoned")))
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use sim_kernel::{
CapabilityName, Consistency, Cx, EvalMode, EvalRequest, Expr, Value, testing::bare_cx as cx,
};
use super::*;
#[derive(Default)]
struct MemoryLedger {
entries: Mutex<Vec<(ContentKey, EvalReply)>>,
writes: Mutex<usize>,
}
impl MemoryLedger {
fn push_existing(&self, key: ContentKey, reply: EvalReply) {
self.entries.lock().unwrap().push((key, reply));
}
fn writes(&self) -> usize {
*self.writes.lock().unwrap()
}
}
impl EvalCassetteLedger for MemoryLedger {
fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
*self.writes.lock().unwrap() += 1;
self.entries
.lock()
.unwrap()
.push((key.clone(), reply.clone()));
Ok(())
}
fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
Ok(self.entries.lock().unwrap().clone())
}
}
fn request(expr: &str) -> EvalRequest {
EvalRequest {
expr: Expr::String(expr.to_owned()),
result_shape: None,
required_capabilities: vec![CapabilityName::new("fabric.test")],
deadline: None,
consistency: Consistency::LocalFirst,
mode: EvalMode::Eval,
answer_limit: None,
stream_buffer: None,
stream: false,
trace: false,
}
}
fn key(expr: &str) -> ContentKey {
ContentKey::from_request(&request(expr))
}
fn reply(cx: &mut Cx, value: &str) -> EvalReply {
EvalReply {
value: cx.factory().string(value.to_owned()).unwrap(),
diagnostics: Vec::new(),
trace: None,
}
}
fn value_display(cx: &mut Cx, value: &Value) -> String {
value.object().display(cx).unwrap()
}
#[test]
fn record_and_get_returns_stored_reply() {
let mut cx = cx();
let ledger = Arc::new(MemoryLedger::default());
let cassette = EvalCassette::new(ledger);
let key = key("record");
let reply = reply(&mut cx, "stored");
cassette.record(key.clone(), reply.clone()).unwrap();
let stored = cassette.get(&key).unwrap();
assert_eq!(
value_display(&mut cx, &stored.value),
value_display(&mut cx, &reply.value)
);
}
#[test]
fn duplicate_record_is_idempotent() {
let mut cx = cx();
let ledger = Arc::new(MemoryLedger::default());
let cassette = EvalCassette::new(ledger.clone());
let key = key("duplicate");
let first = reply(&mut cx, "first");
cassette.record(key.clone(), first.clone()).unwrap();
cassette
.record(key.clone(), reply(&mut cx, "second"))
.unwrap();
assert_eq!(ledger.writes(), 1);
let stored = cassette.get(&key).unwrap();
assert_eq!(
value_display(&mut cx, &stored.value),
value_display(&mut cx, &first.value)
);
}
#[test]
fn from_ledger_replays_prior_entries() {
let mut cx = cx();
let ledger = Arc::new(MemoryLedger::default());
let first_key = key("first");
let second_key = key("second");
let first = reply(&mut cx, "one");
let second = reply(&mut cx, "two");
ledger.push_existing(first_key.clone(), first.clone());
ledger.push_existing(second_key.clone(), second.clone());
let cassette = EvalCassette::from_ledger(ledger).unwrap();
assert_eq!(cassette.len(), 2);
assert_eq!(
value_display(&mut cx, &cassette.get(&first_key).unwrap().value),
value_display(&mut cx, &first.value)
);
assert_eq!(
value_display(&mut cx, &cassette.get(&second_key).unwrap().value),
value_display(&mut cx, &second.value)
);
}
}