1use std::sync::Arc;
9
10use sim_kernel::{Cx, Error, EvalFabric, EvalFabricRef, EvalReply, EvalRequest, Result, Symbol};
11
12use crate::{EvalCassette, HoldingIndex, content_key::ContentKey};
13
14#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
21pub struct HeldContent {
22 pub holder: Symbol,
24 pub key: ContentKey,
26}
27
28impl HeldContent {
29 pub fn new(holder: Symbol, key: ContentKey) -> Self {
31 Self { holder, key }
32 }
33}
34
35pub struct ContentServeFabric {
42 cassette: Arc<EvalCassette>,
43}
44
45impl ContentServeFabric {
46 pub fn new(cassette: Arc<EvalCassette>) -> Self {
48 Self { cassette }
49 }
50
51 pub fn cassette(&self) -> &Arc<EvalCassette> {
53 &self.cassette
54 }
55}
56
57impl EvalFabric for ContentServeFabric {
58 fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
59 let key = ContentKey::from_request(&request);
60 self.cassette
61 .get(&key)
62 .ok_or_else(|| Error::Eval("content not held by this node".to_owned()))
63 }
64}
65
66pub struct ContentPeer {
68 pub node: Symbol,
70 pub serve: EvalFabricRef,
72}
73
74impl ContentPeer {
75 pub fn new(node: Symbol, serve: EvalFabricRef) -> Self {
77 Self { node, serve }
78 }
79}
80
81pub struct ContentAddressedFabric {
89 node: Symbol,
90 local: Arc<EvalCassette>,
91 peers: Vec<ContentPeer>,
92 home: Option<EvalFabricRef>,
93 holders: Arc<HoldingIndex>,
94}
95
96impl ContentAddressedFabric {
97 pub fn new(node: Symbol, local: Arc<EvalCassette>, peers: Vec<ContentPeer>) -> Self {
99 Self {
100 node,
101 local,
102 peers,
103 home: None,
104 holders: Arc::new(HoldingIndex::default()),
105 }
106 }
107
108 pub fn with_holding_index(mut self, holders: Arc<HoldingIndex>) -> Self {
110 self.holders = holders;
111 self
112 }
113
114 pub fn with_home(mut self, home: EvalFabricRef) -> Self {
116 self.home = Some(home);
117 self
118 }
119
120 pub fn node(&self) -> &Symbol {
122 &self.node
123 }
124
125 pub fn local_cassette(&self) -> &Arc<EvalCassette> {
127 &self.local
128 }
129
130 pub fn peers(&self) -> &[ContentPeer] {
132 &self.peers
133 }
134
135 pub fn holding_index(&self) -> &Arc<HoldingIndex> {
137 &self.holders
138 }
139
140 fn ordered_peers(&self, key: &ContentKey) -> Vec<&ContentPeer> {
141 let announced = self.holders.holders_of(key);
142 let mut ordered = Vec::with_capacity(self.peers.len());
143
144 for holder in &announced {
145 if holder == &self.node {
146 continue;
147 }
148 if let Some(peer) = self.peers.iter().find(|peer| &peer.node == holder) {
149 ordered.push(peer);
150 }
151 }
152
153 for peer in &self.peers {
154 if peer.node == self.node || announced.iter().any(|holder| holder == &peer.node) {
155 continue;
156 }
157 ordered.push(peer);
158 }
159
160 ordered
161 }
162
163 fn record_local_hold(&self, key: ContentKey, reply: EvalReply) -> Result<()> {
164 self.local.record(key.clone(), reply)?;
165 self.holders
166 .announce(HeldContent::new(self.node.clone(), key))
167 }
168}
169
170impl EvalFabric for ContentAddressedFabric {
171 fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
172 let key = ContentKey::from_request(&request);
173 if let Some(reply) = self.local.get(&key) {
174 return Ok(reply);
175 }
176
177 for peer in self.ordered_peers(&key) {
178 if let Ok(reply) = peer.serve.realize(cx, request.clone()) {
179 self.record_local_hold(key, reply.clone())?;
180 return Ok(reply);
181 }
182 }
183
184 if let Some(home) = &self.home {
185 let reply = home.realize(cx, request)?;
186 self.record_local_hold(key, reply.clone())?;
187 return Ok(reply);
188 }
189 Err(Error::Eval(
190 "no holder for content id and no home compute site".to_owned(),
191 ))
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::sync::{Arc, Mutex};
198
199 use sim_kernel::{
200 CapabilityName, Consistency, Cx, EvalMode, EvalReply, EvalRequest, Expr, Result, Value,
201 testing::bare_cx as cx,
202 };
203
204 use crate::cassette::EvalCassetteLedger;
205
206 use super::*;
207
208 #[derive(Default)]
209 struct MemoryLedger {
210 entries: Mutex<Vec<(ContentKey, EvalReply)>>,
211 }
212
213 impl EvalCassetteLedger for MemoryLedger {
214 fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
215 self.entries
216 .lock()
217 .unwrap()
218 .push((key.clone(), reply.clone()));
219 Ok(())
220 }
221
222 fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
223 Ok(self.entries.lock().unwrap().clone())
224 }
225 }
226
227 fn mem_ledger() -> Arc<dyn EvalCassetteLedger> {
228 Arc::new(MemoryLedger::default())
229 }
230
231 fn req(expr: &str, caps: &[&str]) -> EvalRequest {
232 EvalRequest {
233 expr: Expr::String(expr.to_owned()),
234 result_shape: None,
235 required_capabilities: caps
236 .iter()
237 .map(|capability| CapabilityName::new(*capability))
238 .collect(),
239 deadline: None,
240 consistency: Consistency::LocalFirst,
241 mode: EvalMode::Eval,
242 answer_limit: None,
243 stream_buffer: None,
244 stream: false,
245 trace: false,
246 }
247 }
248
249 fn reply(cx: &mut Cx, value: &str) -> EvalReply {
250 EvalReply {
251 value: cx.factory().string(value.to_owned()).unwrap(),
252 diagnostics: Vec::new(),
253 trace: None,
254 }
255 }
256
257 fn value_display(cx: &mut Cx, value: &Value) -> String {
258 value.object().display(cx).unwrap()
259 }
260
261 #[test]
262 fn a_datum_is_the_same_from_any_node() {
263 let key_a = ContentKey::from_request(&req("shared", &["fabric.test"]));
264 let key_b = ContentKey::from_request(&req("shared", &["fabric.test"]));
265 assert_eq!(key_a, key_b, "same request must key the same everywhere");
266
267 let mut cx = cx();
268 let reply = reply(&mut cx, "v");
269 let node_a = EvalCassette::new(mem_ledger());
270 let node_b = EvalCassette::new(mem_ledger());
271 node_a.record(key_a.clone(), reply.clone()).unwrap();
272 node_b.record(key_b.clone(), reply.clone()).unwrap();
273
274 let stored_a = node_a.get(&key_a).unwrap();
275 let stored_b = node_b.get(&key_b).unwrap();
276 assert_eq!(stored_a.value, stored_b.value);
277 assert_eq!(
278 value_display(&mut cx, &stored_a.value),
279 value_display(&mut cx, &stored_b.value)
280 );
281 }
282}