sim_lib_stream_fabric/relay.rs
1//! Remote and relayed surfaces over the kernel [`EvalFabric`].
2//!
3//! [`EvalFabric`] is location-transparent by design: a caller submits an
4//! [`EvalRequest`] and receives an [`EvalReply`] without knowing whether the
5//! work ran locally or across a network. A REMOTE or RELAYED surface is
6//! therefore just another [`EvalFabric`] implementation that forwards to an
7//! inner [`EvalFabricRef`] living on the far side of a link.
8//!
9//! [`RelayFabric`] models that far-side node -- for example a phone bridging a
10//! watch's hardware to SIM. It is a CONNECTIVITY gate, not a resume engine. Its
11//! load-bearing contract is twofold:
12//!
13//! - **Location transparency when connected.** When the link is up and policy
14//! allows the request, [`RelayFabric::realize`] returns exactly the inner
15//! fabric's reply.
16//! - **No capability escalation.** A relay can only let through what its own
17//! policy already allows. A request that names a capability outside the
18//! relay's allowed set is refused fail-closed, naming the capability, and the
19//! inner fabric is never reached.
20//!
21//! # No resume; at-most-once per attempt
22//!
23//! The relay does NOT recover in-flight work across a link drop. Each
24//! [`RelayFabric::realize`] call is a single attempt with at-most-once
25//! semantics: it either reaches the inner fabric and returns its reply, or it
26//! returns `Err`. A request whose link drops mid-flight returns an error and is
27//! NOT auto-resumed or replayed; the caller decides whether to retry, and a
28//! retry is a fresh attempt with no deduplication here. [`RelayFabric::reconnect`]
29//! only restores the link to [`RelayStatus::Connected`] so that subsequent
30//! attempts may proceed; it carries no session state, sequence, or idempotency
31//! token. Idempotency and exactly-once delivery, if needed, are the far-side
32//! fabric's responsibility, not the relay's.
33//!
34//! # Backpressure
35//!
36//! A relay carries a surface session's stream traffic over the crate's existing
37//! bounded-frame discipline; it does not re-implement streaming. The frame codec
38//! applies [`crate::StreamFrameLimits`], which caps in-flight frames, total
39//! frames, payload size, duration, and rate. Once a bound is crossed the encoder
40//! appends a limit diagnostic frame and TRUNCATES -- it stops emitting further
41//! frames. It does NOT shed stale frames or keep the newest: there is no ring
42//! buffer, so a lagging consumer sees a truncated stream with a diagnostic, not
43//! a window of the most recent frames.
44
45use std::collections::BTreeSet;
46
47use sim_kernel::{
48 CapabilityName, Cx, Error, EvalFabric, EvalFabricRef, EvalReply, EvalRequest, Result,
49};
50
51/// The connection state of a [`RelayFabric`] link.
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub enum RelayStatus {
54 /// The link is up; requests may be served.
55 Connected,
56 /// The link dropped and a reconnect is in progress; requests fail closed.
57 Reconnecting,
58 /// The link is severed; requests fail closed.
59 Disconnected,
60}
61
62/// A remote or relayed [`EvalFabric`] node carrying a surface session.
63///
64/// `RelayFabric` wraps an inner [`EvalFabricRef`] reached across a link and
65/// enforces a non-escalating capability policy in front of it. Because
66/// [`EvalFabric`] is location-transparent, code that targets the `realize`
67/// surface treats a relay exactly like a local fabric; the relay differs only
68/// in that it can refuse and that its link can drop (failing closed, with no
69/// resume).
70///
71/// The relay preserves each request's [`Consistency`](sim_kernel::Consistency)
72/// unchanged and lets the inner fabric honor it: the inner node beyond the link
73/// is the remote authority, so a relay boundary is naturally a
74/// `RemoteOnly`/`LocalFirst` hop, but the relay never rewrites the caller's
75/// declared consistency.
76///
77/// # Trust of declared capabilities
78///
79/// The capability gate matches each request's self-declared
80/// [`EvalRequest::required_capabilities`] against the allowed set. The relay
81/// therefore PRESUMES truthful capability declaration: it is a connectivity
82/// gate, not the enforcement authority. A caller that under-declares its
83/// capabilities can pass the relay; the inner / far-side fabric, which performs
84/// the real operations behind its own `cx.require(...)` checks, is the actual
85/// authority. The relay's gate exists to fail fast and avoid even reaching the
86/// link for plainly out-of-policy requests, not to substitute for far-side
87/// enforcement.
88pub struct RelayFabric {
89 inner: EvalFabricRef,
90 allowed: BTreeSet<CapabilityName>,
91 status: RelayStatus,
92}
93
94impl RelayFabric {
95 /// Builds a connected relay over `inner` that allows only `allowed`.
96 ///
97 /// The relay starts [`RelayStatus::Connected`]. Any request whose
98 /// [`EvalRequest::required_capabilities`] step outside `allowed` is refused
99 /// rather than forwarded, so the relay can never grant more than it was
100 /// configured to pass through.
101 pub fn new(inner: EvalFabricRef, allowed: Vec<CapabilityName>) -> Self {
102 Self {
103 inner,
104 allowed: allowed.into_iter().collect(),
105 status: RelayStatus::Connected,
106 }
107 }
108
109 /// Returns the relay's current link [`RelayStatus`].
110 pub fn status(&self) -> RelayStatus {
111 self.status
112 }
113
114 /// Reports whether `capability` is within the relay's allowed set.
115 pub fn allows(&self, capability: &CapabilityName) -> bool {
116 self.allowed.contains(capability)
117 }
118
119 /// Severs the link, moving the relay to [`RelayStatus::Disconnected`].
120 ///
121 /// While disconnected, [`RelayFabric::realize`] fails closed and never
122 /// reaches the inner fabric.
123 pub fn disconnect(&mut self) {
124 self.status = RelayStatus::Disconnected;
125 }
126
127 /// Marks the link as [`RelayStatus::Reconnecting`].
128 ///
129 /// Requests still fail closed until [`RelayFabric::reconnect`] completes the
130 /// handshake.
131 pub fn begin_reconnect(&mut self) {
132 self.status = RelayStatus::Reconnecting;
133 }
134
135 /// Restores the link to [`RelayStatus::Connected`].
136 ///
137 /// This only re-opens the gate so that subsequent [`RelayFabric::realize`]
138 /// attempts may proceed. It carries no session state and does NOT resume any
139 /// request that was in flight when the link dropped: such a request already
140 /// returned `Err` and is the caller's to retry as a fresh attempt.
141 pub fn reconnect(&mut self) {
142 self.status = RelayStatus::Connected;
143 }
144}
145
146impl EvalFabric for RelayFabric {
147 fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
148 match self.status {
149 RelayStatus::Connected => {}
150 RelayStatus::Reconnecting => {
151 return Err(Error::Eval(
152 "relay reconnecting: cannot realize until the link is restored".to_owned(),
153 ));
154 }
155 RelayStatus::Disconnected => {
156 return Err(Error::Eval(
157 "relay disconnected: cannot realize over a severed relay link".to_owned(),
158 ));
159 }
160 }
161
162 // Fail closed before delegating: a relay cannot escalate capabilities,
163 // so a request whose self-declared required capabilities step outside
164 // the allowed set is refused and the inner fabric is never reached. This
165 // gate presumes truthful declaration; the far-side fabric is the real
166 // authority (see the type docs).
167 for capability in &request.required_capabilities {
168 if !self.allowed.contains(capability) {
169 return Err(Error::CapabilityDenied {
170 capability: capability.clone(),
171 });
172 }
173 }
174
175 self.inner.realize(cx, request)
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::sync::{Arc, Mutex};
182
183 use sim_kernel::{
184 CapabilityName, Consistency, Cx, Error, EvalFabric, EvalMode, EvalReply, EvalRequest, Expr,
185 Result, Value,
186 };
187
188 use super::{RelayFabric, RelayStatus};
189
190 /// An inner fabric that records the capabilities of every request it sees
191 /// and answers with a fixed reply, like the kernel's `StaticFabric`.
192 struct RecordingFabric {
193 reply: EvalReply,
194 seen: Mutex<Vec<Vec<CapabilityName>>>,
195 }
196
197 impl EvalFabric for RecordingFabric {
198 fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
199 self.seen
200 .lock()
201 .expect("seen mutex")
202 .push(request.required_capabilities.clone());
203 Ok(self.reply.clone())
204 }
205 }
206
207 use sim_kernel::testing::bare_cx as cx;
208
209 fn recording(value: Value) -> Arc<RecordingFabric> {
210 Arc::new(RecordingFabric {
211 reply: EvalReply {
212 value,
213 diagnostics: Vec::new(),
214 trace: None,
215 },
216 seen: Mutex::new(Vec::new()),
217 })
218 }
219
220 fn request(caps: Vec<CapabilityName>) -> EvalRequest {
221 EvalRequest {
222 expr: Expr::Nil,
223 result_shape: None,
224 required_capabilities: caps,
225 deadline: None,
226 consistency: Consistency::LocalFirst,
227 mode: EvalMode::Eval,
228 answer_limit: None,
229 stream_buffer: None,
230 stream: false,
231 trace: false,
232 }
233 }
234
235 #[test]
236 fn allowed_request_passes_through_to_inner_reply() {
237 let mut cx = cx();
238 let value = cx.factory().string("ok".to_owned()).unwrap();
239 let cap = CapabilityName::new("relay.allowed");
240 let inner = recording(value.clone());
241 let relay = RelayFabric::new(inner.clone(), vec![cap.clone()]);
242
243 let reply = relay.realize(&mut cx, request(vec![cap.clone()])).unwrap();
244
245 // Location transparency: the relay returns exactly the inner reply.
246 assert_eq!(reply.value, value);
247 assert!(reply.diagnostics.is_empty());
248 let seen = inner.seen.lock().unwrap();
249 assert_eq!(seen.as_slice(), &[vec![cap]]);
250 }
251
252 #[test]
253 fn relay_has_no_caching_layer() {
254 let mut cx = cx();
255 let allowed = CapabilityName::new("relay.allowed");
256 let value = cx.factory().string("no-cache".to_owned()).unwrap();
257 let inner = recording(value);
258 let relay = RelayFabric::new(inner.clone(), vec![allowed.clone()]);
259
260 relay
261 .realize(&mut cx, request(vec![allowed.clone()]))
262 .unwrap();
263 relay.realize(&mut cx, request(vec![allowed])).unwrap();
264
265 assert_eq!(
266 inner.seen.lock().unwrap().len(),
267 2,
268 "relay must not cache; every call must reach inner"
269 );
270 }
271
272 #[test]
273 fn refused_capability_fails_closed_without_reaching_inner() {
274 let mut cx = cx();
275 let value = cx.factory().nil().unwrap();
276 let allowed = CapabilityName::new("relay.allowed");
277 let refused = CapabilityName::new("relay.refused");
278 let inner = recording(value);
279 let relay = RelayFabric::new(inner.clone(), vec![allowed]);
280
281 let err = relay
282 .realize(&mut cx, request(vec![refused.clone()]))
283 .err()
284 .expect("refused request must fail closed");
285
286 match err {
287 Error::CapabilityDenied { capability } => assert_eq!(capability, refused),
288 other => panic!("expected CapabilityDenied naming the refused capability, got {other}"),
289 }
290 // No escalation: the inner fabric was never asked to realize anything.
291 assert!(inner.seen.lock().unwrap().is_empty());
292 }
293
294 #[test]
295 fn disconnect_fails_closed_then_reconnect_restores_connectivity() {
296 let mut cx = cx();
297 let value = cx.factory().string("reconnected".to_owned()).unwrap();
298 let cap = CapabilityName::new("relay.allowed");
299 let inner = recording(value.clone());
300 let mut relay = RelayFabric::new(inner.clone(), vec![cap.clone()]);
301
302 relay.disconnect();
303 assert_eq!(relay.status(), RelayStatus::Disconnected);
304 let err = relay
305 .realize(&mut cx, request(vec![cap.clone()]))
306 .err()
307 .expect("disconnected relay must fail closed");
308 assert!(matches!(err, Error::Eval(message) if message.contains("relay disconnected")));
309 // No resume: the dropped attempt never reached the inner fabric.
310 assert!(inner.seen.lock().unwrap().is_empty());
311
312 relay.begin_reconnect();
313 assert_eq!(relay.status(), RelayStatus::Reconnecting);
314 // Still fails closed mid-handshake.
315 assert!(relay.realize(&mut cx, request(vec![cap.clone()])).is_err());
316
317 // Reconnect only re-opens the gate; it carries no token or session state.
318 relay.reconnect();
319 assert_eq!(relay.status(), RelayStatus::Connected);
320 let reply = relay.realize(&mut cx, request(vec![cap.clone()])).unwrap();
321 assert_eq!(reply.value, value);
322
323 // A second drop + reconnect cycle simply restores connectivity again;
324 // each realize is an independent at-most-once attempt.
325 relay.disconnect();
326 relay.begin_reconnect();
327 relay.reconnect();
328 assert_eq!(relay.status(), RelayStatus::Connected);
329 relay.realize(&mut cx, request(vec![cap])).unwrap();
330 // Two successful attempts reached the inner fabric -- no dedup, no double
331 // suppression, exactly the two attempts the caller made.
332 assert_eq!(inner.seen.lock().unwrap().len(), 2);
333 }
334}