Skip to main content

sim_lib_stream_fabric/
relay.rs

1//! Remote and relayed surfaces over the kernel [`EvalFabric`].
2//!
3//! [`EvalFabric`] is location-transparent by design: a caller submits an
4//! [`EvalRequest`] and receives an [`EvalReply`] without knowing whether the
5//! work ran locally or across a network. A REMOTE or RELAYED surface is
6//! therefore just another [`EvalFabric`] implementation that forwards to an
7//! inner [`EvalFabricRef`] living on the far side of a link.
8//!
9//! [`RelayFabric`] models that far-side node -- for example a phone bridging a
10//! watch's hardware to SIM. It is a CONNECTIVITY gate, not a resume engine. Its
11//! load-bearing contract is twofold:
12//!
13//! - **Location transparency when connected.** When the link is up and policy
14//!   allows the request, [`RelayFabric::realize`] returns exactly the inner
15//!   fabric's reply.
16//! - **No capability escalation.** A relay can only let through what its own
17//!   policy already allows. A request that names a capability outside the
18//!   relay's allowed set is refused fail-closed, naming the capability, and the
19//!   inner fabric is never reached.
20//!
21//! # No resume; at-most-once per attempt
22//!
23//! The relay does NOT recover in-flight work across a link drop. Each
24//! [`RelayFabric::realize`] call is a single attempt with at-most-once
25//! semantics: it either reaches the inner fabric and returns its reply, or it
26//! returns `Err`. A request whose link drops mid-flight returns an error and is
27//! NOT auto-resumed or replayed; the caller decides whether to retry, and a
28//! retry is a fresh attempt with no deduplication here. [`RelayFabric::reconnect`]
29//! only restores the link to [`RelayStatus::Connected`] so that subsequent
30//! attempts may proceed; it carries no session state, sequence, or idempotency
31//! token. Idempotency and exactly-once delivery, if needed, are the far-side
32//! fabric's responsibility, not the relay's.
33//!
34//! # Backpressure
35//!
36//! A relay carries a surface session's stream traffic over the crate's existing
37//! bounded-frame discipline; it does not re-implement streaming. The frame codec
38//! applies [`crate::StreamFrameLimits`], which caps in-flight frames, total
39//! frames, payload size, duration, and rate. Once a bound is crossed the encoder
40//! appends a limit diagnostic frame and TRUNCATES -- it stops emitting further
41//! frames. It does NOT shed stale frames or keep the newest: there is no ring
42//! buffer, so a lagging consumer sees a truncated stream with a diagnostic, not
43//! a window of the most recent frames.
44
45use std::collections::BTreeSet;
46
47use sim_kernel::{
48    CapabilityName, Cx, Error, EvalFabric, EvalFabricRef, EvalReply, EvalRequest, Result,
49};
50
51/// The connection state of a [`RelayFabric`] link.
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub enum RelayStatus {
54    /// The link is up; requests may be served.
55    Connected,
56    /// The link dropped and a reconnect is in progress; requests fail closed.
57    Reconnecting,
58    /// The link is severed; requests fail closed.
59    Disconnected,
60}
61
62/// A remote or relayed [`EvalFabric`] node carrying a surface session.
63///
64/// `RelayFabric` wraps an inner [`EvalFabricRef`] reached across a link and
65/// enforces a non-escalating capability policy in front of it. Because
66/// [`EvalFabric`] is location-transparent, code that targets the `realize`
67/// surface treats a relay exactly like a local fabric; the relay differs only
68/// in that it can refuse and that its link can drop (failing closed, with no
69/// resume).
70///
71/// The relay preserves each request's [`Consistency`](sim_kernel::Consistency)
72/// unchanged and lets the inner fabric honor it: the inner node beyond the link
73/// is the remote authority, so a relay boundary is naturally a
74/// `RemoteOnly`/`LocalFirst` hop, but the relay never rewrites the caller's
75/// declared consistency.
76///
77/// # Trust of declared capabilities
78///
79/// The capability gate matches each request's self-declared
80/// [`EvalRequest::required_capabilities`] against the allowed set. The relay
81/// therefore PRESUMES truthful capability declaration: it is a connectivity
82/// gate, not the enforcement authority. A caller that under-declares its
83/// capabilities can pass the relay; the inner / far-side fabric, which performs
84/// the real operations behind its own `cx.require(...)` checks, is the actual
85/// authority. The relay's gate exists to fail fast and avoid even reaching the
86/// link for plainly out-of-policy requests, not to substitute for far-side
87/// enforcement.
88pub struct RelayFabric {
89    inner: EvalFabricRef,
90    allowed: BTreeSet<CapabilityName>,
91    status: RelayStatus,
92}
93
94impl RelayFabric {
95    /// Builds a connected relay over `inner` that allows only `allowed`.
96    ///
97    /// The relay starts [`RelayStatus::Connected`]. Any request whose
98    /// [`EvalRequest::required_capabilities`] step outside `allowed` is refused
99    /// rather than forwarded, so the relay can never grant more than it was
100    /// configured to pass through.
101    pub fn new(inner: EvalFabricRef, allowed: Vec<CapabilityName>) -> Self {
102        Self {
103            inner,
104            allowed: allowed.into_iter().collect(),
105            status: RelayStatus::Connected,
106        }
107    }
108
109    /// Returns the relay's current link [`RelayStatus`].
110    pub fn status(&self) -> RelayStatus {
111        self.status
112    }
113
114    /// Reports whether `capability` is within the relay's allowed set.
115    pub fn allows(&self, capability: &CapabilityName) -> bool {
116        self.allowed.contains(capability)
117    }
118
119    /// Severs the link, moving the relay to [`RelayStatus::Disconnected`].
120    ///
121    /// While disconnected, [`RelayFabric::realize`] fails closed and never
122    /// reaches the inner fabric.
123    pub fn disconnect(&mut self) {
124        self.status = RelayStatus::Disconnected;
125    }
126
127    /// Marks the link as [`RelayStatus::Reconnecting`].
128    ///
129    /// Requests still fail closed until [`RelayFabric::reconnect`] completes the
130    /// handshake.
131    pub fn begin_reconnect(&mut self) {
132        self.status = RelayStatus::Reconnecting;
133    }
134
135    /// Restores the link to [`RelayStatus::Connected`].
136    ///
137    /// This only re-opens the gate so that subsequent [`RelayFabric::realize`]
138    /// attempts may proceed. It carries no session state and does NOT resume any
139    /// request that was in flight when the link dropped: such a request already
140    /// returned `Err` and is the caller's to retry as a fresh attempt.
141    pub fn reconnect(&mut self) {
142        self.status = RelayStatus::Connected;
143    }
144}
145
146impl EvalFabric for RelayFabric {
147    fn realize(&self, cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
148        match self.status {
149            RelayStatus::Connected => {}
150            RelayStatus::Reconnecting => {
151                return Err(Error::Eval(
152                    "relay reconnecting: cannot realize until the link is restored".to_owned(),
153                ));
154            }
155            RelayStatus::Disconnected => {
156                return Err(Error::Eval(
157                    "relay disconnected: cannot realize over a severed relay link".to_owned(),
158                ));
159            }
160        }
161
162        // Fail closed before delegating: a relay cannot escalate capabilities,
163        // so a request whose self-declared required capabilities step outside
164        // the allowed set is refused and the inner fabric is never reached. This
165        // gate presumes truthful declaration; the far-side fabric is the real
166        // authority (see the type docs).
167        for capability in &request.required_capabilities {
168            if !self.allowed.contains(capability) {
169                return Err(Error::CapabilityDenied {
170                    capability: capability.clone(),
171                });
172            }
173        }
174
175        self.inner.realize(cx, request)
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::sync::{Arc, Mutex};
182
183    use sim_kernel::{
184        CapabilityName, Consistency, Cx, Error, EvalFabric, EvalMode, EvalReply, EvalRequest, Expr,
185        Result, Value,
186    };
187
188    use super::{RelayFabric, RelayStatus};
189
190    /// An inner fabric that records the capabilities of every request it sees
191    /// and answers with a fixed reply, like the kernel's `StaticFabric`.
192    struct RecordingFabric {
193        reply: EvalReply,
194        seen: Mutex<Vec<Vec<CapabilityName>>>,
195    }
196
197    impl EvalFabric for RecordingFabric {
198        fn realize(&self, _cx: &mut Cx, request: EvalRequest) -> Result<EvalReply> {
199            self.seen
200                .lock()
201                .expect("seen mutex")
202                .push(request.required_capabilities.clone());
203            Ok(self.reply.clone())
204        }
205    }
206
207    use sim_kernel::testing::bare_cx as cx;
208
209    fn recording(value: Value) -> Arc<RecordingFabric> {
210        Arc::new(RecordingFabric {
211            reply: EvalReply {
212                value,
213                diagnostics: Vec::new(),
214                trace: None,
215            },
216            seen: Mutex::new(Vec::new()),
217        })
218    }
219
220    fn request(caps: Vec<CapabilityName>) -> EvalRequest {
221        EvalRequest {
222            expr: Expr::Nil,
223            result_shape: None,
224            required_capabilities: caps,
225            deadline: None,
226            consistency: Consistency::LocalFirst,
227            mode: EvalMode::Eval,
228            answer_limit: None,
229            stream_buffer: None,
230            stream: false,
231            trace: false,
232        }
233    }
234
235    #[test]
236    fn allowed_request_passes_through_to_inner_reply() {
237        let mut cx = cx();
238        let value = cx.factory().string("ok".to_owned()).unwrap();
239        let cap = CapabilityName::new("relay.allowed");
240        let inner = recording(value.clone());
241        let relay = RelayFabric::new(inner.clone(), vec![cap.clone()]);
242
243        let reply = relay.realize(&mut cx, request(vec![cap.clone()])).unwrap();
244
245        // Location transparency: the relay returns exactly the inner reply.
246        assert_eq!(reply.value, value);
247        assert!(reply.diagnostics.is_empty());
248        let seen = inner.seen.lock().unwrap();
249        assert_eq!(seen.as_slice(), &[vec![cap]]);
250    }
251
252    #[test]
253    fn relay_has_no_caching_layer() {
254        let mut cx = cx();
255        let allowed = CapabilityName::new("relay.allowed");
256        let value = cx.factory().string("no-cache".to_owned()).unwrap();
257        let inner = recording(value);
258        let relay = RelayFabric::new(inner.clone(), vec![allowed.clone()]);
259
260        relay
261            .realize(&mut cx, request(vec![allowed.clone()]))
262            .unwrap();
263        relay.realize(&mut cx, request(vec![allowed])).unwrap();
264
265        assert_eq!(
266            inner.seen.lock().unwrap().len(),
267            2,
268            "relay must not cache; every call must reach inner"
269        );
270    }
271
272    #[test]
273    fn refused_capability_fails_closed_without_reaching_inner() {
274        let mut cx = cx();
275        let value = cx.factory().nil().unwrap();
276        let allowed = CapabilityName::new("relay.allowed");
277        let refused = CapabilityName::new("relay.refused");
278        let inner = recording(value);
279        let relay = RelayFabric::new(inner.clone(), vec![allowed]);
280
281        let err = relay
282            .realize(&mut cx, request(vec![refused.clone()]))
283            .err()
284            .expect("refused request must fail closed");
285
286        match err {
287            Error::CapabilityDenied { capability } => assert_eq!(capability, refused),
288            other => panic!("expected CapabilityDenied naming the refused capability, got {other}"),
289        }
290        // No escalation: the inner fabric was never asked to realize anything.
291        assert!(inner.seen.lock().unwrap().is_empty());
292    }
293
294    #[test]
295    fn disconnect_fails_closed_then_reconnect_restores_connectivity() {
296        let mut cx = cx();
297        let value = cx.factory().string("reconnected".to_owned()).unwrap();
298        let cap = CapabilityName::new("relay.allowed");
299        let inner = recording(value.clone());
300        let mut relay = RelayFabric::new(inner.clone(), vec![cap.clone()]);
301
302        relay.disconnect();
303        assert_eq!(relay.status(), RelayStatus::Disconnected);
304        let err = relay
305            .realize(&mut cx, request(vec![cap.clone()]))
306            .err()
307            .expect("disconnected relay must fail closed");
308        assert!(matches!(err, Error::Eval(message) if message.contains("relay disconnected")));
309        // No resume: the dropped attempt never reached the inner fabric.
310        assert!(inner.seen.lock().unwrap().is_empty());
311
312        relay.begin_reconnect();
313        assert_eq!(relay.status(), RelayStatus::Reconnecting);
314        // Still fails closed mid-handshake.
315        assert!(relay.realize(&mut cx, request(vec![cap.clone()])).is_err());
316
317        // Reconnect only re-opens the gate; it carries no token or session state.
318        relay.reconnect();
319        assert_eq!(relay.status(), RelayStatus::Connected);
320        let reply = relay.realize(&mut cx, request(vec![cap.clone()])).unwrap();
321        assert_eq!(reply.value, value);
322
323        // A second drop + reconnect cycle simply restores connectivity again;
324        // each realize is an independent at-most-once attempt.
325        relay.disconnect();
326        relay.begin_reconnect();
327        relay.reconnect();
328        assert_eq!(relay.status(), RelayStatus::Connected);
329        relay.realize(&mut cx, request(vec![cap])).unwrap();
330        // Two successful attempts reached the inner fabric -- no dedup, no double
331        // suppression, exactly the two attempts the caller made.
332        assert_eq!(inner.seen.lock().unwrap().len(), 2);
333    }
334}