use std::sync::Arc;
use sim_kernel::{Cx, Error, EvalFabric, EvalFabricRef, EvalReply, EvalRequest, Result, Symbol};
use crate::{EvalCassette, HoldingIndex, content_key::ContentKey};
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct HeldContent {
pub holder: Symbol,
pub key: ContentKey,
}
impl HeldContent {
pub fn new(holder: Symbol, key: ContentKey) -> Self {
Self { holder, key }
}
}
pub struct ContentServeFabric {
cassette: Arc<EvalCassette>,
}
impl ContentServeFabric {
pub fn new(cassette: Arc<EvalCassette>) -> Self {
Self { cassette }
}
pub fn cassette(&self) -> &Arc<EvalCassette> {
&self.cassette
}
}
impl EvalFabric for ContentServeFabric {
fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
let key = ContentKey::from_request(&request);
self.cassette
.get(&key)
.ok_or_else(|| Error::Eval("content not held by this node".to_owned()))
}
}
pub struct ContentPeer {
pub node: Symbol,
pub serve: EvalFabricRef,
}
impl ContentPeer {
pub fn new(node: Symbol, serve: EvalFabricRef) -> Self {
Self { node, serve }
}
}
pub struct ContentAddressedFabric {
node: Symbol,
local: Arc<EvalCassette>,
peers: Vec<ContentPeer>,
home: Option<EvalFabricRef>,
holders: Arc<HoldingIndex>,
}
impl ContentAddressedFabric {
pub fn new(node: Symbol, local: Arc<EvalCassette>, peers: Vec<ContentPeer>) -> Self {
Self {
node,
local,
peers,
home: None,
holders: Arc::new(HoldingIndex::default()),
}
}
pub fn with_holding_index(mut self, holders: Arc<HoldingIndex>) -> Self {
self.holders = holders;
self
}
pub fn with_home(mut self, home: EvalFabricRef) -> Self {
self.home = Some(home);
self
}
pub fn node(&self) -> &Symbol {
&self.node
}
pub fn local_cassette(&self) -> &Arc<EvalCassette> {
&self.local
}
pub fn peers(&self) -> &[ContentPeer] {
&self.peers
}
pub fn holding_index(&self) -> &Arc<HoldingIndex> {
&self.holders
}
fn ordered_peers(&self, key: &ContentKey) -> Vec<&ContentPeer> {
let announced = self.holders.holders_of(key);
let mut ordered = Vec::with_capacity(self.peers.len());
for holder in &announced {
if holder == &self.node {
continue;
}
if let Some(peer) = self.peers.iter().find(|peer| &peer.node == holder) {
ordered.push(peer);
}
}
for peer in &self.peers {
if peer.node == self.node || announced.iter().any(|holder| holder == &peer.node) {
continue;
}
ordered.push(peer);
}
ordered
}
fn record_local_hold(&self, key: ContentKey, reply: EvalReply) -> Result<()> {
self.local.record(key.clone(), reply)?;
self.holders
.announce(HeldContent::new(self.node.clone(), key))
}
}
impl EvalFabric for ContentAddressedFabric {
fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
let key = ContentKey::from_request(&request);
if let Some(reply) = self.local.get(&key) {
return Ok(reply);
}
for peer in self.ordered_peers(&key) {
if let Ok(reply) = peer.serve.realize(cx, request.clone()) {
self.record_local_hold(key, reply.clone())?;
return Ok(reply);
}
}
if let Some(home) = &self.home {
let reply = home.realize(cx, request)?;
self.record_local_hold(key, reply.clone())?;
return Ok(reply);
}
Err(Error::Eval(
"no holder for content id and no home compute site".to_owned(),
))
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use sim_kernel::{
CapabilityName, Consistency, Cx, EvalMode, EvalReply, EvalRequest, Expr, Result, Value,
testing::bare_cx as cx,
};
use crate::cassette::EvalCassetteLedger;
use super::*;
#[derive(Default)]
struct MemoryLedger {
entries: Mutex<Vec<(ContentKey, EvalReply)>>,
}
impl EvalCassetteLedger for MemoryLedger {
fn append_eval_result(&self, key: &ContentKey, reply: &EvalReply) -> Result<()> {
self.entries
.lock()
.unwrap()
.push((key.clone(), reply.clone()));
Ok(())
}
fn replay_eval_results(&self) -> Result<Vec<(ContentKey, EvalReply)>> {
Ok(self.entries.lock().unwrap().clone())
}
}
fn mem_ledger() -> Arc<dyn EvalCassetteLedger> {
Arc::new(MemoryLedger::default())
}
fn req(expr: &str, caps: &[&str]) -> EvalRequest {
EvalRequest {
expr: Expr::String(expr.to_owned()),
result_shape: None,
required_capabilities: caps
.iter()
.map(|capability| CapabilityName::new(*capability))
.collect(),
deadline: None,
consistency: Consistency::LocalFirst,
mode: EvalMode::Eval,
answer_limit: None,
stream_buffer: None,
stream: false,
trace: false,
}
}
fn reply(cx: &mut Cx, value: &str) -> EvalReply {
EvalReply {
value: cx.factory().string(value.to_owned()).unwrap(),
diagnostics: Vec::new(),
trace: None,
}
}
fn value_display(cx: &mut Cx, value: &Value) -> String {
value.object().display(cx).unwrap()
}
#[test]
fn a_datum_is_the_same_from_any_node() {
let key_a = ContentKey::from_request(&req("shared", &["fabric.test"]));
let key_b = ContentKey::from_request(&req("shared", &["fabric.test"]));
assert_eq!(key_a, key_b, "same request must key the same everywhere");
let mut cx = cx();
let reply = reply(&mut cx, "v");
let node_a = EvalCassette::new(mem_ledger());
let node_b = EvalCassette::new(mem_ledger());
node_a.record(key_a.clone(), reply.clone()).unwrap();
node_b.record(key_b.clone(), reply.clone()).unwrap();
let stored_a = node_a.get(&key_a).unwrap();
let stored_b = node_b.get(&key_b).unwrap();
assert_eq!(stored_a.value, stored_b.value);
assert_eq!(
value_display(&mut cx, &stored_a.value),
value_display(&mut cx, &stored_b.value)
);
}
}