Skip to main content

sim_lib_stream_fabric/
placement_security.rs

1use sim_kernel::{CapabilityName, Expr, Result, Symbol};
2use sim_lib_stream_core::{
3    StreamRedactionFinding, StreamRemoteLimits, StreamSecurityPolicy,
4    stream_host_device_capability, stream_remote_render_capability,
5};
6
7/// Capability a peer must hold to place work across the stream fabric.
8///
9/// Each variant maps to a stable wire label and a kernel `CapabilityName`,
10/// gating where a node may run (server or LAN peer), whether it may render
11/// remotely, and whether it may reach a host audio/IO device.
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub enum PlacementCapability {
14    /// Run a placed node on the server site.
15    RunNodeOnServer,
16    /// Run a placed node on a LAN peer site.
17    RunNodeOnLanPeer,
18    /// Render a fragment remotely and return the result.
19    RemoteRender,
20    /// Reach a host device (audio, `/dev`, hardware port) during placement.
21    HostDevice,
22}
23
24impl PlacementCapability {
25    /// Returns the stable wire label string for this capability.
26    pub fn wire_label(self) -> &'static str {
27        match self {
28            Self::RunNodeOnServer => "stream.placement.run-node-on-server",
29            Self::RunNodeOnLanPeer => "stream.placement.run-node-on-lan-peer",
30            Self::RemoteRender => "stream.remote.render",
31            Self::HostDevice => "stream.host.device",
32        }
33    }
34
35    /// Returns the kernel capability name this placement capability requires.
36    pub fn capability(self) -> CapabilityName {
37        match self {
38            Self::RemoteRender => stream_remote_render_capability(),
39            Self::HostDevice => stream_host_device_capability(),
40            Self::RunNodeOnServer | Self::RunNodeOnLanPeer => {
41                CapabilityName::new(self.wire_label())
42            }
43        }
44    }
45
46    /// Returns the qualified symbol identifying this capability.
47    pub fn symbol(self) -> Symbol {
48        Symbol::qualified("stream/placement-capability", self.wire_label())
49    }
50}
51
52/// Resource bounds enforced on a server placement before and during execution.
53///
54/// Combines compute bounds (CPU time, memory, inflight work) with the
55/// frame-shaping bounds shared with `StreamRemoteLimits`. A placement that
56/// would exceed any bound is refused rather than run unbounded; see
57/// [`PlacementResourceLimits::validate`]. The [`Default`] impl applies
58/// conservative defaults.
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub struct PlacementResourceLimits {
61    /// Maximum wall-clock CPU time, in milliseconds, allowed for the node.
62    pub max_cpu_time_ms: u64,
63    /// Maximum accounted payload memory, in bytes, across emitted frames.
64    pub max_memory_bytes: usize,
65    /// Maximum encoded payload size, in bytes, allowed for a single frame.
66    pub max_frame_payload_bytes: usize,
67    /// Maximum number of data frames emitted for one placement.
68    pub max_stream_frames: usize,
69    /// Maximum stream duration, in milliseconds, before truncation.
70    pub max_duration_ms: u64,
71    /// Maximum stream rate, in hertz, used to derive the effective frame limit.
72    pub max_rate_hz: u32,
73    /// Maximum number of frames allowed in flight at once.
74    pub max_inflight_work: usize,
75}
76
77impl Default for PlacementResourceLimits {
78    fn default() -> Self {
79        let limits = StreamRemoteLimits::default();
80        Self {
81            max_cpu_time_ms: 5_000,
82            max_memory_bytes: 64 * 1024 * 1024,
83            max_frame_payload_bytes: limits.max_frame_payload_bytes,
84            max_stream_frames: limits.max_stream_frames,
85            max_duration_ms: limits.max_duration_ms,
86            max_rate_hz: limits.max_rate_hz,
87            max_inflight_work: limits.max_inflight_frames,
88        }
89    }
90}
91
92impl PlacementResourceLimits {
93    /// Validates that every bound is positive and frame-consistent.
94    ///
95    /// Returns an error when CPU time, memory, stream-size, or inflight-work is
96    /// zero, or when the derived [`PlacementResourceLimits::remote_limits`] are
97    /// themselves invalid.
98    pub fn validate(self) -> Result<()> {
99        if self.max_cpu_time_ms == 0 {
100            return Err(sim_kernel::Error::Eval(
101                "placement cpu-time limit must be positive".to_owned(),
102            ));
103        }
104        if self.max_memory_bytes == 0 {
105            return Err(sim_kernel::Error::Eval(
106                "placement memory limit must be positive".to_owned(),
107            ));
108        }
109        if self.max_stream_frames == 0 {
110            return Err(sim_kernel::Error::Eval(
111                "placement stream-size limit must be positive".to_owned(),
112            ));
113        }
114        if self.max_inflight_work == 0 {
115            return Err(sim_kernel::Error::Eval(
116                "placement inflight-work limit must be positive".to_owned(),
117            ));
118        }
119        self.remote_limits().validate()
120    }
121
122    /// Projects the frame-shaping subset of these limits onto a
123    /// `StreamRemoteLimits`, mapping inflight-work to the inflight-frame bound.
124    pub fn remote_limits(self) -> StreamRemoteLimits {
125        StreamRemoteLimits {
126            max_frame_payload_bytes: self.max_frame_payload_bytes,
127            max_stream_frames: self.max_stream_frames,
128            max_inflight_frames: self.max_inflight_work,
129            max_duration_ms: self.max_duration_ms,
130            max_rate_hz: self.max_rate_hz,
131            max_binary_payload_bytes: StreamRemoteLimits::default().max_binary_payload_bytes,
132        }
133    }
134
135    /// Returns the effective per-stream frame limit after duration and rate
136    /// bounds are folded into the configured stream-size limit.
137    pub fn effective_stream_frame_limit(self) -> usize {
138        self.remote_limits().effective_frame_limit()
139    }
140
141    /// Encodes these limits as a string-valued map `Expr` for placement reports.
142    pub fn to_expr(self) -> Expr {
143        Expr::Map(vec![
144            field("max-cpu-time-ms", self.max_cpu_time_ms.to_string()),
145            field("max-memory-bytes", self.max_memory_bytes.to_string()),
146            field(
147                "max-frame-payload-bytes",
148                self.max_frame_payload_bytes.to_string(),
149            ),
150            field("max-stream-frames", self.max_stream_frames.to_string()),
151            field("max-duration-ms", self.max_duration_ms.to_string()),
152            field("max-rate-hz", self.max_rate_hz.to_string()),
153            field("max-inflight-work", self.max_inflight_work.to_string()),
154        ])
155    }
156}
157
158/// Returns the capability required to run a placed node on the server site.
159pub fn placement_run_node_on_server_capability() -> CapabilityName {
160    PlacementCapability::RunNodeOnServer.capability()
161}
162
163/// Returns the capability required to run a placed node on a LAN peer site.
164pub fn placement_run_node_on_lan_peer_capability() -> CapabilityName {
165    PlacementCapability::RunNodeOnLanPeer.capability()
166}
167
168/// Returns the capability required to render a fragment remotely.
169pub fn placement_remote_render_capability() -> CapabilityName {
170    PlacementCapability::RemoteRender.capability()
171}
172
173/// Returns the capability required to reach a host device during placement.
174pub fn placement_host_device_capability() -> CapabilityName {
175    PlacementCapability::HostDevice.capability()
176}
177
178/// Returns the kernel capability names for every [`PlacementCapability`].
179pub fn placement_capability_names() -> Vec<CapabilityName> {
180    [
181        PlacementCapability::RunNodeOnServer,
182        PlacementCapability::RunNodeOnLanPeer,
183        PlacementCapability::RemoteRender,
184        PlacementCapability::HostDevice,
185    ]
186    .into_iter()
187    .map(PlacementCapability::capability)
188    .collect()
189}
190
191/// Returns a copy of `expr` with sensitive placement data redacted.
192///
193/// Walks the expression under the default `StreamSecurityPolicy`, replacing
194/// flagged strings, symbols, large binary payloads, and host-device references
195/// with redaction placeholders so placement reports can be safely transported.
196pub fn redact_placement_expr(expr: &Expr) -> Expr {
197    redact_expr(expr, StreamSecurityPolicy::default())
198}
199
200/// Returns `symbol`, or a redaction placeholder if it names sensitive data.
201///
202/// A symbol is redacted when the default security policy flags its qualified
203/// text or when it references a host device.
204pub fn redact_placement_symbol(symbol: &Symbol) -> Symbol {
205    if StreamSecurityPolicy::default()
206        .finding_for_text(&symbol.as_qualified_str())
207        .is_some()
208        || placement_text_uses_host_device(&symbol.as_qualified_str())
209    {
210        Symbol::qualified("stream/redacted", "placement")
211    } else {
212        symbol.clone()
213    }
214}
215
216/// Returns whether any symbol, string, or nested node in `expr` references a
217/// host device.
218pub fn placement_expr_uses_host_device(expr: &Expr) -> bool {
219    match expr {
220        Expr::Symbol(symbol) | Expr::Local(symbol) => {
221            placement_text_uses_host_device(&symbol.as_qualified_str())
222        }
223        Expr::String(value) => placement_text_uses_host_device(value),
224        Expr::List(items) | Expr::Vector(items) | Expr::Set(items) | Expr::Block(items) => {
225            items.iter().any(placement_expr_uses_host_device)
226        }
227        Expr::Map(entries) => entries.iter().any(|(key, value)| {
228            placement_expr_uses_host_device(key) || placement_expr_uses_host_device(value)
229        }),
230        Expr::Call { operator, args } => {
231            placement_expr_uses_host_device(operator)
232                || args.iter().any(placement_expr_uses_host_device)
233        }
234        Expr::Infix {
235            operator,
236            left,
237            right,
238        } => {
239            placement_text_uses_host_device(&operator.as_qualified_str())
240                || placement_expr_uses_host_device(left)
241                || placement_expr_uses_host_device(right)
242        }
243        Expr::Prefix { operator, arg } | Expr::Postfix { operator, arg } => {
244            placement_text_uses_host_device(&operator.as_qualified_str())
245                || placement_expr_uses_host_device(arg)
246        }
247        Expr::Quote { expr, .. } => placement_expr_uses_host_device(expr),
248        Expr::Annotated { expr, annotations } => {
249            placement_expr_uses_host_device(expr)
250                || annotations.iter().any(|(key, value)| {
251                    placement_text_uses_host_device(&key.as_qualified_str())
252                        || placement_expr_uses_host_device(value)
253                })
254        }
255        Expr::Extension { tag, payload } => {
256            placement_text_uses_host_device(&tag.as_qualified_str())
257                || placement_expr_uses_host_device(payload)
258        }
259        _ => false,
260    }
261}
262
263/// Returns whether `value` mentions a host-device path, driver, or prefix.
264pub fn placement_text_uses_host_device(value: &str) -> bool {
265    value.contains("/dev/")
266        || value.contains("hw:")
267        || value.contains("CoreAudio")
268        || value.contains("ALSA")
269        || value.contains("host-device")
270        || value.starts_with("device/")
271        || value.starts_with("host/device")
272}
273
274fn redact_expr(expr: &Expr, policy: StreamSecurityPolicy) -> Expr {
275    match expr {
276        Expr::Symbol(symbol) => Expr::Symbol(redact_placement_symbol(symbol)),
277        Expr::Local(symbol) => Expr::Local(redact_placement_symbol(symbol)),
278        Expr::String(value) => Expr::String(redact_text(value, policy)),
279        Expr::Bytes(bytes)
280            if policy.finding_for_expr(expr) == Some(StreamRedactionFinding::LargeBinaryData) =>
281        {
282            Expr::String("[redacted placement payload]".to_owned())
283        }
284        Expr::List(items) => {
285            Expr::List(items.iter().map(|item| redact_expr(item, policy)).collect())
286        }
287        Expr::Vector(items) => {
288            Expr::Vector(items.iter().map(|item| redact_expr(item, policy)).collect())
289        }
290        Expr::Set(items) => Expr::Set(items.iter().map(|item| redact_expr(item, policy)).collect()),
291        Expr::Map(entries) => Expr::Map(
292            entries
293                .iter()
294                .map(|(key, value)| (redact_expr(key, policy), redact_expr(value, policy)))
295                .collect(),
296        ),
297        Expr::Call { operator, args } => Expr::Call {
298            operator: Box::new(redact_expr(operator, policy)),
299            args: args.iter().map(|arg| redact_expr(arg, policy)).collect(),
300        },
301        Expr::Infix {
302            operator,
303            left,
304            right,
305        } => Expr::Infix {
306            operator: redact_placement_symbol(operator),
307            left: Box::new(redact_expr(left, policy)),
308            right: Box::new(redact_expr(right, policy)),
309        },
310        Expr::Prefix { operator, arg } => Expr::Prefix {
311            operator: redact_placement_symbol(operator),
312            arg: Box::new(redact_expr(arg, policy)),
313        },
314        Expr::Postfix { operator, arg } => Expr::Postfix {
315            operator: redact_placement_symbol(operator),
316            arg: Box::new(redact_expr(arg, policy)),
317        },
318        Expr::Block(items) => {
319            Expr::Block(items.iter().map(|item| redact_expr(item, policy)).collect())
320        }
321        Expr::Quote { mode, expr } => Expr::Quote {
322            mode: *mode,
323            expr: Box::new(redact_expr(expr, policy)),
324        },
325        Expr::Annotated { expr, annotations } => Expr::Annotated {
326            expr: Box::new(redact_expr(expr, policy)),
327            annotations: annotations
328                .iter()
329                .map(|(key, value)| (redact_placement_symbol(key), redact_expr(value, policy)))
330                .collect(),
331        },
332        Expr::Extension { tag, payload } => Expr::Extension {
333            tag: redact_placement_symbol(tag),
334            payload: Box::new(redact_expr(payload, policy)),
335        },
336        other => other.clone(),
337    }
338}
339
340fn redact_text(value: &str, policy: StreamSecurityPolicy) -> String {
341    if policy.finding_for_text(value).is_some() || placement_text_uses_host_device(value) {
342        "[redacted placement data]".to_owned()
343    } else {
344        value.to_owned()
345    }
346}
347
348fn field(name: &str, value: String) -> (Expr, Expr) {
349    (Expr::Symbol(Symbol::new(name)), Expr::String(value))
350}