Skip to main content

sim_lib_stream_fabric/
ledgered_relay.rs

1//! Cache-first relay fabric backed by an eval cassette.
2
3use std::sync::Arc;
4
5use sim_kernel::{Cx, EvalFabric, EvalReply, EvalRequest, Object, ObjectCompat, Result, Value};
6
7use crate::{ContentKey, EvalCassette};
8
9/// A cache-first [`EvalFabric`] that records successful replies in a cassette.
10///
11/// [`LedgeredRelayFabric`] derives a [`ContentKey`] from each request, returns a
12/// cached reply without touching the inner fabric when one exists, and writes
13/// every successful miss-through reply to the cassette. Capability policy stays
14/// entirely inside the inner fabric: errors, including capability denials, are
15/// propagated without recording a cassette result.
16pub struct LedgeredRelayFabric<F> {
17    inner: F,
18    cassette: Arc<EvalCassette>,
19}
20
21impl<F: EvalFabric> LedgeredRelayFabric<F> {
22    /// Builds a ledgered relay around `inner` and `cassette`.
23    pub fn new(inner: F, cassette: Arc<EvalCassette>) -> Self {
24        Self { inner, cassette }
25    }
26
27    /// Returns the cassette used for cache lookups and write-through records.
28    pub fn cassette(&self) -> &Arc<EvalCassette> {
29        &self.cassette
30    }
31
32    /// Returns the wrapped fabric.
33    pub fn inner(&self) -> &F {
34        &self.inner
35    }
36}
37
38impl<F: EvalFabric + 'static> EvalFabric for LedgeredRelayFabric<F> {
39    fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
40        let key = ContentKey::from_request(&request);
41        if let Some(cached) = self.cassette.get(&key) {
42            return Ok(cached);
43        }
44
45        let reply = self.inner.realize(cx, request)?;
46        self.cassette.record(key, reply.clone())?;
47        Ok(reply)
48    }
49}
50
51impl<F: EvalFabric + 'static> Object for LedgeredRelayFabric<F> {
52    fn display(&self, _cx: &mut Cx) -> Result<String> {
53        Ok("#<ledgered-relay-fabric>".to_owned())
54    }
55
56    fn as_any(&self) -> &dyn std::any::Any {
57        self
58    }
59}
60
61impl<F: EvalFabric + 'static> ObjectCompat for LedgeredRelayFabric<F> {
62    fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
63        Some(self)
64    }
65}
66
67/// Wraps `inner` and `cassette` as an opaque runtime value.
68///
69/// # Errors
70///
71/// Returns an error when the active factory cannot allocate the opaque value.
72pub fn ledgered_relay_fabric_value<F: EvalFabric + 'static>(
73    cx: &mut Cx,
74    inner: F,
75    cassette: Arc<EvalCassette>,
76) -> Result<Value> {
77    cx.factory()
78        .opaque(Arc::new(LedgeredRelayFabric::new(inner, cassette)))
79}
80
81#[cfg(test)]
82mod tests {
83    use std::sync::{Arc, Mutex};
84
85    use sim_kernel::{
86        CapabilityName, Consistency, Cx, Error, EvalFabric, EvalMode, EvalReply, EvalRequest, Expr,
87        Result, Value, testing::bare_cx as cx,
88    };
89
90    use crate::{EvalCassette, EvalCassetteLedger};
91
92    use super::{LedgeredRelayFabric, ledgered_relay_fabric_value};
93
94    struct CountingFabric {
95        replies: Vec<EvalReply>,
96        calls: Arc<Mutex<usize>>,
97    }
98
99    impl CountingFabric {
100        fn new(replies: Vec<EvalReply>, calls: Arc<Mutex<usize>>) -> Self {
101            Self { replies, calls }
102        }
103    }
104
105    impl EvalFabric for CountingFabric {
106        fn realize(&self, _cx: &mut Cx, _request: EvalRequest) -> Result<EvalReply> {
107            let mut calls = self.calls.lock().unwrap();
108            let reply = self
109                .replies
110                .get(*calls)
111                .or_else(|| self.replies.last())
112                .expect("counting fabric must have at least one reply")
113                .clone();
114            *calls += 1;
115            Ok(reply)
116        }
117    }
118
119    struct DenyAll;
120
121    impl EvalFabric for DenyAll {
122        fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
123            Err(Error::CapabilityDenied {
124                capability: request.required_capabilities[0].clone(),
125            })
126        }
127    }
128
129    #[derive(Default)]
130    struct MemoryLedger {
131        entries: Mutex<Vec<(crate::ContentKey, EvalReply)>>,
132    }
133
134    impl EvalCassetteLedger for MemoryLedger {
135        fn append_eval_result(&self, key: &crate::ContentKey, reply: &EvalReply) -> Result<()> {
136            self.entries
137                .lock()
138                .unwrap()
139                .push((key.clone(), reply.clone()));
140            Ok(())
141        }
142
143        fn replay_eval_results(&self) -> Result<Vec<(crate::ContentKey, EvalReply)>> {
144            Ok(self.entries.lock().unwrap().clone())
145        }
146    }
147
148    fn cassette() -> Arc<EvalCassette> {
149        Arc::new(EvalCassette::new(Arc::new(MemoryLedger::default())))
150    }
151
152    fn request(expr: &str) -> EvalRequest {
153        request_with_cap(expr, "fabric.test")
154    }
155
156    fn request_with_cap(expr: &str, cap: &str) -> EvalRequest {
157        EvalRequest {
158            expr: Expr::String(expr.to_owned()),
159            result_shape: None,
160            required_capabilities: vec![CapabilityName::new(cap)],
161            deadline: None,
162            consistency: Consistency::LocalFirst,
163            mode: EvalMode::Eval,
164            answer_limit: None,
165            stream_buffer: None,
166            stream: false,
167            trace: false,
168        }
169    }
170
171    fn reply(cx: &mut Cx, value: &str) -> EvalReply {
172        EvalReply {
173            value: cx.factory().string(value.to_owned()).unwrap(),
174            diagnostics: Vec::new(),
175            trace: None,
176        }
177    }
178
179    fn value_display(cx: &mut Cx, value: &Value) -> String {
180        value.object().display(cx).unwrap()
181    }
182
183    #[test]
184    fn ledgered_relay_cache_miss_calls_inner_once_then_cache_hit_skips_inner() {
185        let mut cx = cx();
186        let calls = Arc::new(Mutex::new(0));
187        let inner = CountingFabric::new(vec![reply(&mut cx, "first")], calls.clone());
188        let fabric = LedgeredRelayFabric::new(inner, cassette());
189        let request = request("same-expr");
190
191        let first = fabric.realize(&mut cx, request.clone()).unwrap();
192        let second = fabric.realize(&mut cx, request).unwrap();
193
194        assert_eq!(*calls.lock().unwrap(), 1);
195        assert_eq!(
196            value_display(&mut cx, &second.value),
197            value_display(&mut cx, &first.value)
198        );
199    }
200
201    #[test]
202    fn ledgered_relay_different_expressions_each_reach_inner_once() {
203        let mut cx = cx();
204        let calls = Arc::new(Mutex::new(0));
205        let inner = CountingFabric::new(
206            vec![reply(&mut cx, "first"), reply(&mut cx, "second")],
207            calls.clone(),
208        );
209        let fabric = LedgeredRelayFabric::new(inner, cassette());
210
211        fabric.realize(&mut cx, request("expr-a")).unwrap();
212        fabric.realize(&mut cx, request("expr-b")).unwrap();
213
214        assert_eq!(*calls.lock().unwrap(), 2);
215    }
216
217    #[test]
218    fn ledgered_relay_capability_error_is_not_recorded_in_cassette() {
219        let cassette = cassette();
220        let fabric = LedgeredRelayFabric::new(DenyAll, cassette.clone());
221        let mut cx = cx();
222
223        let Err(err) = fabric.realize(&mut cx, request_with_cap("blocked", "fabric.denied")) else {
224            panic!("capability error must propagate");
225        };
226
227        assert!(matches!(err, Error::CapabilityDenied { .. }));
228        assert_eq!(cassette.len(), 0);
229    }
230
231    #[test]
232    fn ledgered_relay_value_exposes_eval_fabric() {
233        let mut cx = cx();
234        let inner = CountingFabric::new(vec![reply(&mut cx, "value")], Arc::new(Mutex::new(0)));
235        let value = ledgered_relay_fabric_value(&mut cx, inner, cassette()).unwrap();
236
237        assert!(value.object().as_eval_fabric().is_some());
238    }
239}