1use sim_kernel::{CapabilityName, Expr, Result, Symbol};
2use sim_lib_stream_core::{
3 StreamRedactionFinding, StreamRemoteLimits, StreamSecurityPolicy,
4 stream_host_device_capability, stream_remote_render_capability,
5};
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13pub enum PlacementCapability {
14 RunNodeOnServer,
16 RunNodeOnLanPeer,
18 RemoteRender,
20 HostDevice,
22}
23
24impl PlacementCapability {
25 pub fn wire_label(self) -> &'static str {
27 match self {
28 Self::RunNodeOnServer => "stream.placement.run-node-on-server",
29 Self::RunNodeOnLanPeer => "stream.placement.run-node-on-lan-peer",
30 Self::RemoteRender => "stream.remote.render",
31 Self::HostDevice => "stream.host.device",
32 }
33 }
34
35 pub fn capability(self) -> CapabilityName {
37 match self {
38 Self::RemoteRender => stream_remote_render_capability(),
39 Self::HostDevice => stream_host_device_capability(),
40 Self::RunNodeOnServer | Self::RunNodeOnLanPeer => {
41 CapabilityName::new(self.wire_label())
42 }
43 }
44 }
45
46 pub fn symbol(self) -> Symbol {
48 Symbol::qualified("stream/placement-capability", self.wire_label())
49 }
50}
51
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub struct PlacementResourceLimits {
61 pub max_cpu_time_ms: u64,
63 pub max_memory_bytes: usize,
65 pub max_frame_payload_bytes: usize,
67 pub max_stream_frames: usize,
69 pub max_duration_ms: u64,
71 pub max_rate_hz: u32,
73 pub max_inflight_work: usize,
75}
76
77impl Default for PlacementResourceLimits {
78 fn default() -> Self {
79 let limits = StreamRemoteLimits::default();
80 Self {
81 max_cpu_time_ms: 5_000,
82 max_memory_bytes: 64 * 1024 * 1024,
83 max_frame_payload_bytes: limits.max_frame_payload_bytes,
84 max_stream_frames: limits.max_stream_frames,
85 max_duration_ms: limits.max_duration_ms,
86 max_rate_hz: limits.max_rate_hz,
87 max_inflight_work: limits.max_inflight_frames,
88 }
89 }
90}
91
92impl PlacementResourceLimits {
93 pub fn validate(self) -> Result<()> {
99 if self.max_cpu_time_ms == 0 {
100 return Err(sim_kernel::Error::Eval(
101 "placement cpu-time limit must be positive".to_owned(),
102 ));
103 }
104 if self.max_memory_bytes == 0 {
105 return Err(sim_kernel::Error::Eval(
106 "placement memory limit must be positive".to_owned(),
107 ));
108 }
109 if self.max_stream_frames == 0 {
110 return Err(sim_kernel::Error::Eval(
111 "placement stream-size limit must be positive".to_owned(),
112 ));
113 }
114 if self.max_inflight_work == 0 {
115 return Err(sim_kernel::Error::Eval(
116 "placement inflight-work limit must be positive".to_owned(),
117 ));
118 }
119 self.remote_limits().validate()
120 }
121
122 pub fn remote_limits(self) -> StreamRemoteLimits {
125 StreamRemoteLimits {
126 max_frame_payload_bytes: self.max_frame_payload_bytes,
127 max_stream_frames: self.max_stream_frames,
128 max_inflight_frames: self.max_inflight_work,
129 max_duration_ms: self.max_duration_ms,
130 max_rate_hz: self.max_rate_hz,
131 max_binary_payload_bytes: StreamRemoteLimits::default().max_binary_payload_bytes,
132 }
133 }
134
135 pub fn effective_stream_frame_limit(self) -> usize {
138 self.remote_limits().effective_frame_limit()
139 }
140
141 pub fn to_expr(self) -> Expr {
143 Expr::Map(vec![
144 field("max-cpu-time-ms", self.max_cpu_time_ms.to_string()),
145 field("max-memory-bytes", self.max_memory_bytes.to_string()),
146 field(
147 "max-frame-payload-bytes",
148 self.max_frame_payload_bytes.to_string(),
149 ),
150 field("max-stream-frames", self.max_stream_frames.to_string()),
151 field("max-duration-ms", self.max_duration_ms.to_string()),
152 field("max-rate-hz", self.max_rate_hz.to_string()),
153 field("max-inflight-work", self.max_inflight_work.to_string()),
154 ])
155 }
156}
157
158pub fn placement_run_node_on_server_capability() -> CapabilityName {
160 PlacementCapability::RunNodeOnServer.capability()
161}
162
163pub fn placement_run_node_on_lan_peer_capability() -> CapabilityName {
165 PlacementCapability::RunNodeOnLanPeer.capability()
166}
167
168pub fn placement_remote_render_capability() -> CapabilityName {
170 PlacementCapability::RemoteRender.capability()
171}
172
173pub fn placement_host_device_capability() -> CapabilityName {
175 PlacementCapability::HostDevice.capability()
176}
177
178pub fn placement_capability_names() -> Vec<CapabilityName> {
180 [
181 PlacementCapability::RunNodeOnServer,
182 PlacementCapability::RunNodeOnLanPeer,
183 PlacementCapability::RemoteRender,
184 PlacementCapability::HostDevice,
185 ]
186 .into_iter()
187 .map(PlacementCapability::capability)
188 .collect()
189}
190
191pub fn redact_placement_expr(expr: &Expr) -> Expr {
197 redact_expr(expr, StreamSecurityPolicy::default())
198}
199
200pub fn redact_placement_symbol(symbol: &Symbol) -> Symbol {
205 if StreamSecurityPolicy::default()
206 .finding_for_text(&symbol.as_qualified_str())
207 .is_some()
208 || placement_text_uses_host_device(&symbol.as_qualified_str())
209 {
210 Symbol::qualified("stream/redacted", "placement")
211 } else {
212 symbol.clone()
213 }
214}
215
216pub fn placement_expr_uses_host_device(expr: &Expr) -> bool {
219 match expr {
220 Expr::Symbol(symbol) | Expr::Local(symbol) => {
221 placement_text_uses_host_device(&symbol.as_qualified_str())
222 }
223 Expr::String(value) => placement_text_uses_host_device(value),
224 Expr::List(items) | Expr::Vector(items) | Expr::Set(items) | Expr::Block(items) => {
225 items.iter().any(placement_expr_uses_host_device)
226 }
227 Expr::Map(entries) => entries.iter().any(|(key, value)| {
228 placement_expr_uses_host_device(key) || placement_expr_uses_host_device(value)
229 }),
230 Expr::Call { operator, args } => {
231 placement_expr_uses_host_device(operator)
232 || args.iter().any(placement_expr_uses_host_device)
233 }
234 Expr::Infix {
235 operator,
236 left,
237 right,
238 } => {
239 placement_text_uses_host_device(&operator.as_qualified_str())
240 || placement_expr_uses_host_device(left)
241 || placement_expr_uses_host_device(right)
242 }
243 Expr::Prefix { operator, arg } | Expr::Postfix { operator, arg } => {
244 placement_text_uses_host_device(&operator.as_qualified_str())
245 || placement_expr_uses_host_device(arg)
246 }
247 Expr::Quote { expr, .. } => placement_expr_uses_host_device(expr),
248 Expr::Annotated { expr, annotations } => {
249 placement_expr_uses_host_device(expr)
250 || annotations.iter().any(|(key, value)| {
251 placement_text_uses_host_device(&key.as_qualified_str())
252 || placement_expr_uses_host_device(value)
253 })
254 }
255 Expr::Extension { tag, payload } => {
256 placement_text_uses_host_device(&tag.as_qualified_str())
257 || placement_expr_uses_host_device(payload)
258 }
259 _ => false,
260 }
261}
262
263pub fn placement_text_uses_host_device(value: &str) -> bool {
265 value.contains("/dev/")
266 || value.contains("hw:")
267 || value.contains("CoreAudio")
268 || value.contains("ALSA")
269 || value.contains("host-device")
270 || value.starts_with("device/")
271 || value.starts_with("host/device")
272}
273
274fn redact_expr(expr: &Expr, policy: StreamSecurityPolicy) -> Expr {
275 match expr {
276 Expr::Symbol(symbol) => Expr::Symbol(redact_placement_symbol(symbol)),
277 Expr::Local(symbol) => Expr::Local(redact_placement_symbol(symbol)),
278 Expr::String(value) => Expr::String(redact_text(value, policy)),
279 Expr::Bytes(bytes)
280 if policy.finding_for_expr(expr) == Some(StreamRedactionFinding::LargeBinaryData) =>
281 {
282 Expr::String("[redacted placement payload]".to_owned())
283 }
284 Expr::List(items) => {
285 Expr::List(items.iter().map(|item| redact_expr(item, policy)).collect())
286 }
287 Expr::Vector(items) => {
288 Expr::Vector(items.iter().map(|item| redact_expr(item, policy)).collect())
289 }
290 Expr::Set(items) => Expr::Set(items.iter().map(|item| redact_expr(item, policy)).collect()),
291 Expr::Map(entries) => Expr::Map(
292 entries
293 .iter()
294 .map(|(key, value)| (redact_expr(key, policy), redact_expr(value, policy)))
295 .collect(),
296 ),
297 Expr::Call { operator, args } => Expr::Call {
298 operator: Box::new(redact_expr(operator, policy)),
299 args: args.iter().map(|arg| redact_expr(arg, policy)).collect(),
300 },
301 Expr::Infix {
302 operator,
303 left,
304 right,
305 } => Expr::Infix {
306 operator: redact_placement_symbol(operator),
307 left: Box::new(redact_expr(left, policy)),
308 right: Box::new(redact_expr(right, policy)),
309 },
310 Expr::Prefix { operator, arg } => Expr::Prefix {
311 operator: redact_placement_symbol(operator),
312 arg: Box::new(redact_expr(arg, policy)),
313 },
314 Expr::Postfix { operator, arg } => Expr::Postfix {
315 operator: redact_placement_symbol(operator),
316 arg: Box::new(redact_expr(arg, policy)),
317 },
318 Expr::Block(items) => {
319 Expr::Block(items.iter().map(|item| redact_expr(item, policy)).collect())
320 }
321 Expr::Quote { mode, expr } => Expr::Quote {
322 mode: *mode,
323 expr: Box::new(redact_expr(expr, policy)),
324 },
325 Expr::Annotated { expr, annotations } => Expr::Annotated {
326 expr: Box::new(redact_expr(expr, policy)),
327 annotations: annotations
328 .iter()
329 .map(|(key, value)| (redact_placement_symbol(key), redact_expr(value, policy)))
330 .collect(),
331 },
332 Expr::Extension { tag, payload } => Expr::Extension {
333 tag: redact_placement_symbol(tag),
334 payload: Box::new(redact_expr(payload, policy)),
335 },
336 other => other.clone(),
337 }
338}
339
340fn redact_text(value: &str, policy: StreamSecurityPolicy) -> String {
341 if policy.finding_for_text(value).is_some() || placement_text_uses_host_device(value) {
342 "[redacted placement data]".to_owned()
343 } else {
344 value.to_owned()
345 }
346}
347
348fn field(name: &str, value: String) -> (Expr, Expr) {
349 (Expr::Symbol(Symbol::new(name)), Expr::String(value))
350}