Skip to main content

sim_lib_stream_fabric/
store.rs

1//! Content-addressed store contract types.
2//!
3//! The store layer keeps routing out of caller code. A request maps to a
4//! [`ContentKey`], and any holder of that key can answer through the standard
5//! [`EvalFabric`] surface. Local code still calls `realize`; transport handles
6//! are wrapped as peer fabrics below this API.
7
8use std::sync::Arc;
9
10use sim_kernel::{Cx, Error, EvalFabric, EvalFabricRef, EvalReply, EvalRequest, Result, Symbol};
11
12use crate::{EvalCassette, HoldingIndex, content_key::ContentKey};
13
14/// A claim that one node holds one content id in its cassette.
15///
16/// This is the complete topology fact for the content-addressed store: not a
17/// route, not a partition, just an immutable statement that `holder` can answer
18/// any request whose [`ContentKey`] equals `key`. Because content never mutates,
19/// a `HeldContent` fact is monotonic and never invalidated.
20#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
21pub struct HeldContent {
22    /// The node that holds the content.
23    pub holder: Symbol,
24    /// The content-addressed request key held by the node.
25    pub key: ContentKey,
26}
27
28impl HeldContent {
29    /// Creates a held-content claim for `holder` and `key`.
30    pub fn new(holder: Symbol, key: ContentKey) -> Self {
31        Self { holder, key }
32    }
33}
34
35/// Read side of the content-addressed store for one node.
36///
37/// This fabric answers a request only when its cassette already holds the
38/// request's [`ContentKey`]. It never computes, never contacts another node,
39/// and never invents a route; it is the question "do you hold content id X"
40/// exposed as an [`EvalFabric`].
41pub struct ContentServeFabric {
42    cassette: Arc<EvalCassette>,
43}
44
45impl ContentServeFabric {
46    /// Builds a serve-only fabric over `cassette`.
47    pub fn new(cassette: Arc<EvalCassette>) -> Self {
48        Self { cassette }
49    }
50
51    /// Returns the cassette this serve fabric reads from.
52    pub fn cassette(&self) -> &Arc<EvalCassette> {
53        &self.cassette
54    }
55}
56
57impl EvalFabric for ContentServeFabric {
58    fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
59        let key = ContentKey::from_request(&request);
60        self.cassette
61            .get(&key)
62            .ok_or_else(|| Error::Eval("content not held by this node".to_owned()))
63    }
64}
65
66/// A peer that may hold content in its local cassette.
67pub struct ContentPeer {
68    /// Stable node id for diagnostics and later holder-index announcements.
69    pub node: Symbol,
70    /// Serve-only fabric for the peer's held content.
71    pub serve: EvalFabricRef,
72}
73
74impl ContentPeer {
75    /// Builds a peer handle from `node` and its serve fabric.
76    pub fn new(node: Symbol, serve: EvalFabricRef) -> Self {
77        Self { node, serve }
78    }
79}
80
81/// Content-addressed lookup fabric over local and peer cassettes.
82///
83/// This presents the fleet as one [`EvalFabric`]. Lookup checks the local
84/// cassette first, asks peers announced by the [`HoldingIndex`] before falling
85/// back to the configured peer order, records the first immutable reply locally,
86/// and computes on the configured home fabric only when no holder answers.
87/// Callers never select a node or a transport; "routing" is content lookup.
88pub struct ContentAddressedFabric {
89    node: Symbol,
90    local: Arc<EvalCassette>,
91    peers: Vec<ContentPeer>,
92    home: Option<EvalFabricRef>,
93    holders: Arc<HoldingIndex>,
94}
95
96impl ContentAddressedFabric {
97    /// Builds a content-addressed lookup fabric for `node`.
98    pub fn new(node: Symbol, local: Arc<EvalCassette>, peers: Vec<ContentPeer>) -> Self {
99        Self {
100            node,
101            local,
102            peers,
103            home: None,
104            holders: Arc::new(HoldingIndex::default()),
105        }
106    }
107
108    /// Uses `holders` as this node's grow-only holder index.
109    pub fn with_holding_index(mut self, holders: Arc<HoldingIndex>) -> Self {
110        self.holders = holders;
111        self
112    }
113
114    /// Records a home compute fabric handle for the miss path.
115    pub fn with_home(mut self, home: EvalFabricRef) -> Self {
116        self.home = Some(home);
117        self
118    }
119
120    /// Returns this node's stable id.
121    pub fn node(&self) -> &Symbol {
122        &self.node
123    }
124
125    /// Returns the local cassette used for held-content lookup and caching.
126    pub fn local_cassette(&self) -> &Arc<EvalCassette> {
127        &self.local
128    }
129
130    /// Returns the configured peer list.
131    pub fn peers(&self) -> &[ContentPeer] {
132        &self.peers
133    }
134
135    /// Returns this node's grow-only holder index.
136    pub fn holding_index(&self) -> &Arc<HoldingIndex> {
137        &self.holders
138    }
139
140    fn ordered_peers(&self, key: &ContentKey) -> Vec<&ContentPeer> {
141        let announced = self.holders.holders_of(key);
142        let mut ordered = Vec::with_capacity(self.peers.len());
143
144        for holder in &announced {
145            if holder == &self.node {
146                continue;
147            }
148            if let Some(peer) = self.peers.iter().find(|peer| &peer.node == holder) {
149                ordered.push(peer);
150            }
151        }
152
153        for peer in &self.peers {
154            if peer.node == self.node || announced.iter().any(|holder| holder == &peer.node) {
155                continue;
156            }
157            ordered.push(peer);
158        }
159
160        ordered
161    }
162
163    fn record_local_hold(&self, key: ContentKey, reply: EvalReply) -> Result<()> {
164        self.local.record(key.clone(), reply)?;
165        self.holders
166            .announce(HeldContent::new(self.node.clone(), key))
167    }
168}
169
170impl EvalFabric for ContentAddressedFabric {
171    fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
172        let key = ContentKey::from_request(&request);
173        if let Some(reply) = self.local.get(&key) {
174            return Ok(reply);
175        }
176
177        for peer in self.ordered_peers(&key) {
178            if let Ok(reply) = peer.serve.realize(cx, request.clone()) {
179                self.record_local_hold(key, reply.clone())?;
180                return Ok(reply);
181            }
182        }
183
184        if let Some(home) = &self.home {
185            let reply = home.realize(cx, request)?;
186            self.record_local_hold(key, reply.clone())?;
187            return Ok(reply);
188        }
189        Err(Error::Eval(
190            "no holder for content id and no home compute site".to_owned(),
191        ))
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::sync::{Arc, Mutex};
198
199    use sim_kernel::{
200        CapabilityName, Consistency, Cx, EvalMode, EvalReply, EvalRequest, Expr, Result, Value,
201        testing::bare_cx as cx,
202    };
203
204    use crate::cassette::EvalCassetteLedger;
205
206    use super::*;
207
208    #[derive(Default)]
209    struct MemoryLedger {
210        entries: Mutex<Vec<(ContentKey, EvalReply)>>,
211    }
212
213    impl EvalCassetteLedger for MemoryLedger {
214        fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
215            self.entries
216                .lock()
217                .unwrap()
218                .push((key.clone(), reply.clone()));
219            Ok(())
220        }
221
222        fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
223            Ok(self.entries.lock().unwrap().clone())
224        }
225    }
226
227    fn mem_ledger() -> Arc<dyn EvalCassetteLedger> {
228        Arc::new(MemoryLedger::default())
229    }
230
231    fn req(expr: &str, caps: &[&str]) -> EvalRequest {
232        EvalRequest {
233            expr: Expr::String(expr.to_owned()),
234            result_shape: None,
235            required_capabilities: caps
236                .iter()
237                .map(|capability| CapabilityName::new(*capability))
238                .collect(),
239            deadline: None,
240            consistency: Consistency::LocalFirst,
241            mode: EvalMode::Eval,
242            answer_limit: None,
243            stream_buffer: None,
244            stream: false,
245            trace: false,
246        }
247    }
248
249    fn reply(cx: &mut Cx, value: &str) -> EvalReply {
250        EvalReply {
251            value: cx.factory().string(value.to_owned()).unwrap(),
252            diagnostics: Vec::new(),
253            trace: None,
254        }
255    }
256
257    fn value_display(cx: &mut Cx, value: &Value) -> String {
258        value.object().display(cx).unwrap()
259    }
260
261    #[test]
262    fn a_datum_is_the_same_from_any_node() {
263        let key_a = ContentKey::from_request(&req("shared", &["fabric.test"]));
264        let key_b = ContentKey::from_request(&req("shared", &["fabric.test"]));
265        assert_eq!(key_a, key_b, "same request must key the same everywhere");
266
267        let mut cx = cx();
268        let reply = reply(&mut cx, "v");
269        let node_a = EvalCassette::new(mem_ledger());
270        let node_b = EvalCassette::new(mem_ledger());
271        node_a.record(key_a.clone(), reply.clone()).unwrap();
272        node_b.record(key_b.clone(), reply.clone()).unwrap();
273
274        let stored_a = node_a.get(&key_a).unwrap();
275        let stored_b = node_b.get(&key_b).unwrap();
276        assert_eq!(stored_a.value, stored_b.value);
277        assert_eq!(
278            value_display(&mut cx, &stored_a.value),
279            value_display(&mut cx, &stored_b.value)
280        );
281    }
282}