use std::collections::BTreeSet;
use sim_kernel::{
CapabilityName, Cx, Error, EvalFabric, EvalFabricRef, EvalReply, EvalRequest, Result,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RelayStatus {
Connected,
Reconnecting,
Disconnected,
}
pub struct RelayFabric {
inner: EvalFabricRef,
allowed: BTreeSet<CapabilityName>,
status: RelayStatus,
}
impl RelayFabric {
pub fn new(inner: EvalFabricRef, allowed: Vec<CapabilityName>) -> Self {
Self {
inner,
allowed: allowed.into_iter().collect(),
status: RelayStatus::Connected,
}
}
pub fn status(&self) -> RelayStatus {
self.status
}
pub fn allows(&self, capability: &CapabilityName) -> bool {
self.allowed.contains(capability)
}
pub fn disconnect(&mut self) {
self.status = RelayStatus::Disconnected;
}
pub fn begin_reconnect(&mut self) {
self.status = RelayStatus::Reconnecting;
}
pub fn reconnect(&mut self) {
self.status = RelayStatus::Connected;
}
}
impl EvalFabric for RelayFabric {
fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
match self.status {
RelayStatus::Connected => {}
RelayStatus::Reconnecting => {
return Err(Error::Eval(
"relay reconnecting: cannot realize until the link is restored".to_owned(),
));
}
RelayStatus::Disconnected => {
return Err(Error::Eval(
"relay disconnected: cannot realize over a severed relay link".to_owned(),
));
}
}
for capability in &request.required_capabilities {
if !self.allowed.contains(capability) {
return Err(Error::CapabilityDenied {
capability: capability.clone(),
});
}
}
self.inner.realize(cx, request)
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use sim_kernel::{
CapabilityName, Consistency, Cx, Error, EvalFabric, EvalMode, EvalReply, EvalRequest, Expr,
Result, Value,
};
use super::{RelayFabric, RelayStatus};
struct RecordingFabric {
reply: EvalReply,
seen: Mutex<Vec<Vec<CapabilityName>>>,
}
impl EvalFabric for RecordingFabric {
fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
self.seen
.lock()
.expect("seen mutex")
.push(request.required_capabilities.clone());
Ok(self.reply.clone())
}
}
use sim_kernel::testing::bare_cx as cx;
fn recording(value: Value) -> Arc<RecordingFabric> {
Arc::new(RecordingFabric {
reply: EvalReply {
value,
diagnostics: Vec::new(),
trace: None,
},
seen: Mutex::new(Vec::new()),
})
}
fn request(caps: Vec<CapabilityName>) -> EvalRequest {
EvalRequest {
expr: Expr::Nil,
result_shape: None,
required_capabilities: caps,
deadline: None,
consistency: Consistency::LocalFirst,
mode: EvalMode::Eval,
answer_limit: None,
stream_buffer: None,
stream: false,
trace: false,
}
}
#[test]
fn allowed_request_passes_through_to_inner_reply() {
let mut cx = cx();
let value = cx.factory().string("ok".to_owned()).unwrap();
let cap = CapabilityName::new("relay.allowed");
let inner = recording(value.clone());
let relay = RelayFabric::new(inner.clone(), vec![cap.clone()]);
let reply = relay.realize(&mut cx, request(vec![cap.clone()])).unwrap();
assert_eq!(reply.value, value);
assert!(reply.diagnostics.is_empty());
let seen = inner.seen.lock().unwrap();
assert_eq!(seen.as_slice(), &[vec![cap]]);
}
#[test]
fn relay_has_no_caching_layer() {
let mut cx = cx();
let allowed = CapabilityName::new("relay.allowed");
let value = cx.factory().string("no-cache".to_owned()).unwrap();
let inner = recording(value);
let relay = RelayFabric::new(inner.clone(), vec![allowed.clone()]);
relay
.realize(&mut cx, request(vec![allowed.clone()]))
.unwrap();
relay.realize(&mut cx, request(vec![allowed])).unwrap();
assert_eq!(
inner.seen.lock().unwrap().len(),
2,
"relay must not cache; every call must reach inner"
);
}
#[test]
fn refused_capability_fails_closed_without_reaching_inner() {
let mut cx = cx();
let value = cx.factory().nil().unwrap();
let allowed = CapabilityName::new("relay.allowed");
let refused = CapabilityName::new("relay.refused");
let inner = recording(value);
let relay = RelayFabric::new(inner.clone(), vec![allowed]);
let err = relay
.realize(&mut cx, request(vec![refused.clone()]))
.err()
.expect("refused request must fail closed");
match err {
Error::CapabilityDenied { capability } => assert_eq!(capability, refused),
other => panic!("expected CapabilityDenied naming the refused capability, got {other}"),
}
assert!(inner.seen.lock().unwrap().is_empty());
}
#[test]
fn disconnect_fails_closed_then_reconnect_restores_connectivity() {
let mut cx = cx();
let value = cx.factory().string("reconnected".to_owned()).unwrap();
let cap = CapabilityName::new("relay.allowed");
let inner = recording(value.clone());
let mut relay = RelayFabric::new(inner.clone(), vec![cap.clone()]);
relay.disconnect();
assert_eq!(relay.status(), RelayStatus::Disconnected);
let err = relay
.realize(&mut cx, request(vec![cap.clone()]))
.err()
.expect("disconnected relay must fail closed");
assert!(matches!(err, Error::Eval(message) if message.contains("relay disconnected")));
assert!(inner.seen.lock().unwrap().is_empty());
relay.begin_reconnect();
assert_eq!(relay.status(), RelayStatus::Reconnecting);
assert!(relay.realize(&mut cx, request(vec![cap.clone()])).is_err());
relay.reconnect();
assert_eq!(relay.status(), RelayStatus::Connected);
let reply = relay.realize(&mut cx, request(vec![cap.clone()])).unwrap();
assert_eq!(reply.value, value);
relay.disconnect();
relay.begin_reconnect();
relay.reconnect();
assert_eq!(relay.status(), RelayStatus::Connected);
relay.realize(&mut cx, request(vec![cap])).unwrap();
assert_eq!(inner.seen.lock().unwrap().len(), 2);
}
}