1use std::time::Duration;
2
3use sim_kernel::{Cx, Error, Expr, Result, Symbol};
4use sim_lib_server::{
5 FrameEnvelope, FrameKind, ServerFrame, Site, stream_chunk_frame_from_expr, stream_end_frame,
6 stream_frame_from_expr,
7};
8use sim_lib_stream_core::{
9 BufferPolicy, ClockDomain, LatencyClass, PlacedFragment, StreamCapability, StreamDirection,
10 StreamEnvelope, StreamMedia, StreamMetadata, StreamPacket, TransportProfile,
11 stream_remote_network_capability, stream_remote_preview_capability,
12};
13
14use crate::events::diagnostic_stream_packet;
15use crate::placement_report::{
16 placement_uses_host_device, request_report_expr, result_report_expr,
17};
18use crate::placement_security::{
19 PlacementResourceLimits, placement_host_device_capability, placement_remote_render_capability,
20 placement_run_node_on_server_capability, redact_placement_symbol,
21};
22
23#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct ServerPlacementRequest {
26 placement_report: Expr,
27 output_profile: TransportProfile,
28 realtime_pinned: bool,
29 resource_limits: PlacementResourceLimits,
30}
31
32impl ServerPlacementRequest {
33 pub fn new(placement_report: Expr, output_profile: TransportProfile) -> Self {
37 Self {
38 placement_report,
39 output_profile,
40 realtime_pinned: false,
41 resource_limits: PlacementResourceLimits::default(),
42 }
43 }
44
45 pub fn buffered_preview(placement_report: Expr) -> Self {
47 Self::new(placement_report, server_buffered_preview_profile())
48 }
49
50 pub fn render_return(placement_report: Expr) -> Self {
52 Self::new(placement_report, server_render_return_profile())
53 }
54
55 pub fn with_realtime_pin(mut self, realtime_pinned: bool) -> Self {
60 self.realtime_pinned = realtime_pinned;
61 self
62 }
63
64 pub fn with_resource_limits(mut self, resource_limits: PlacementResourceLimits) -> Self {
66 self.resource_limits = resource_limits;
67 self
68 }
69
70 pub fn placement_report(&self) -> &Expr {
72 &self.placement_report
73 }
74
75 pub fn output_profile(&self) -> &TransportProfile {
77 &self.output_profile
78 }
79
80 pub fn realtime_pinned(&self) -> bool {
82 self.realtime_pinned
83 }
84
85 pub fn resource_limits(&self) -> PlacementResourceLimits {
87 self.resource_limits
88 }
89}
90
91pub fn server_site_symbol() -> Symbol {
93 Symbol::qualified("stream/site", "server")
94}
95
96pub fn server_placement_request_symbol() -> Symbol {
98 Symbol::qualified("stream/fabric", "server-placement-request")
99}
100
101pub fn server_placement_result_symbol() -> Symbol {
103 Symbol::qualified("stream/fabric", "server-placement-result")
104}
105
106pub fn server_placement_refusal_symbol() -> Symbol {
108 Symbol::qualified("stream/fabric", "server-placement-refusal")
109}
110
111pub fn server_placement_frame_operation_symbols() -> [Symbol; 3] {
113 [
114 server_placement_request_symbol(),
115 server_placement_result_symbol(),
116 server_placement_refusal_symbol(),
117 ]
118}
119
120pub fn server_placement_refusal_diagnostic_kind() -> Symbol {
122 Symbol::qualified("stream/fabric", "ServerPlacementRefused")
123}
124
125pub fn server_buffered_preview_profile() -> TransportProfile {
130 TransportProfile::new(
131 Symbol::qualified("stream/profile", "server-buffered-preview"),
132 LatencyClass::BufferedPreview,
133 vec![
134 StreamCapability::Remote,
135 StreamCapability::Bounded,
136 StreamCapability::Preview,
137 StreamCapability::Lossy,
138 ],
139 )
140 .expect("server-buffered-preview stream profile is valid")
141}
142
143pub fn server_render_return_profile() -> TransportProfile {
148 TransportProfile::new(
149 Symbol::qualified("stream/profile", "server-render-return"),
150 LatencyClass::OfflineRender,
151 vec![
152 StreamCapability::Remote,
153 StreamCapability::Bounded,
154 StreamCapability::Deterministic,
155 StreamCapability::Replayable,
156 StreamCapability::Resumable,
157 ],
158 )
159 .expect("server-render-return stream profile is valid")
160}
161
162pub fn server_placement_stream_frames(
172 cx: &mut Cx,
173 site: &dyn Site,
174 fragment: PlacedFragment,
175 request: ServerPlacementRequest,
176 codec: Symbol,
177 frame_envelope: FrameEnvelope,
178) -> Result<Vec<ServerFrame>> {
179 cx.require(&placement_run_node_on_server_capability())?;
180 cx.require(&stream_remote_network_capability())?;
181 require_host_device_access(cx, &fragment, &request)?;
182 let limits = request.resource_limits();
183 limits.validate()?;
184 let metadata = placement_metadata(&fragment, limits);
185 let mut frames = vec![stream_frame_from_expr(
186 cx,
187 codec.clone(),
188 FrameKind::StreamStart,
189 &metadata.table_expr(),
190 frame_envelope.clone(),
191 )?];
192 let mut accounted_payload_bytes = frames
193 .first()
194 .map(|frame| frame.payload.len())
195 .unwrap_or_default();
196 let mut data_frame_count = 0_usize;
197 let mut sequence = 0_u64;
198 let request_frame = data_frame(
199 cx,
200 codec.clone(),
201 &metadata,
202 sequence,
203 server_placement_request_symbol(),
204 request_report_expr(&fragment, &request),
205 frame_envelope.clone(),
206 )?;
207 if let Some(message) = limit_cutoff_message(
208 &request_frame,
209 limits,
210 &mut accounted_payload_bytes,
211 data_frame_count,
212 frames.len(),
213 ) {
214 return finish_with_refusal(
215 cx,
216 codec,
217 &metadata,
218 sequence,
219 message,
220 frame_envelope,
221 frames,
222 );
223 }
224 frames.push(request_frame);
225 data_frame_count = data_frame_count.saturating_add(1);
226 sequence = sequence.saturating_add(1);
227
228 if request.realtime_pinned {
229 return finish_with_refusal(
230 cx,
231 codec,
232 &metadata,
233 sequence,
234 format!(
235 "server placement refused fragment {}: stream/site/server cannot host a realtime-pinned node",
236 redacted_fragment_label(&fragment)
237 ),
238 frame_envelope,
239 frames,
240 );
241 }
242
243 require_server_output_profile(cx, request.output_profile())?;
244 site.accept_input_edges(fragment.input_edges())?;
245 let reply = site.realize_fragment_node_with_timeout(
246 cx,
247 &fragment,
248 Some(Duration::from_millis(limits.max_cpu_time_ms)),
249 )?;
250 let payload = reply.value.object().as_expr(cx)?;
251 let result_frame = data_frame(
252 cx,
253 codec.clone(),
254 &metadata,
255 sequence,
256 server_placement_result_symbol(),
257 result_report_expr(&fragment, payload, request.output_profile()),
258 frame_envelope.clone(),
259 )?;
260 if let Some(message) = limit_cutoff_message(
261 &result_frame,
262 limits,
263 &mut accounted_payload_bytes,
264 data_frame_count,
265 frames.len(),
266 ) {
267 return finish_with_refusal(
268 cx,
269 codec,
270 &metadata,
271 sequence,
272 message,
273 frame_envelope,
274 frames,
275 );
276 }
277 frames.push(result_frame);
278 data_frame_count = data_frame_count.saturating_add(1);
279 sequence = sequence.saturating_add(1);
280
281 for envelope in fragment.output_envelopes() {
282 let output = envelope_with_profile(&envelope, request.output_profile().clone())?;
283 let frame = stream_chunk_frame_from_expr(
284 cx,
285 codec.clone(),
286 &output.to_expr(),
287 frame_envelope.clone(),
288 )?;
289 if let Some(message) = limit_cutoff_message(
290 &frame,
291 limits,
292 &mut accounted_payload_bytes,
293 data_frame_count,
294 frames.len(),
295 ) {
296 return finish_with_refusal(
297 cx,
298 codec,
299 &metadata,
300 sequence,
301 message,
302 frame_envelope,
303 frames,
304 );
305 }
306 frames.push(frame);
307 data_frame_count = data_frame_count.saturating_add(1);
308 sequence = sequence.saturating_add(1);
309 }
310
311 frames.push(stream_end_frame(codec, frame_envelope));
312 Ok(frames)
313}
314
315fn placement_metadata(
316 fragment: &PlacedFragment,
317 limits: PlacementResourceLimits,
318) -> StreamMetadata {
319 StreamMetadata::new(
320 redact_placement_symbol(&Symbol::qualified(
321 "stream/server-placement",
322 fragment.id().as_qualified_str(),
323 )),
324 StreamMedia::Data,
325 StreamDirection::Source,
326 ClockDomain::ServerFrame.symbol(),
327 BufferPolicy::bounded(limits.max_inflight_work)
328 .expect("validated placement inflight-work limit is nonzero"),
329 )
330}
331
332fn require_server_output_profile(cx: &mut Cx, profile: &TransportProfile) -> Result<()> {
333 if profile.has_capability(StreamCapability::Realtime)
334 || profile.latency_class() == LatencyClass::SampleExact
335 {
336 return Err(Error::Eval(format!(
337 "server placement refuses realtime profile {}",
338 profile.name().as_qualified_str()
339 )));
340 }
341 match profile.latency_class() {
342 LatencyClass::BufferedPreview => cx.require(&stream_remote_preview_capability()),
343 LatencyClass::OfflineRender => cx.require(&placement_remote_render_capability()),
344 LatencyClass::Interactive => Ok(()),
345 latency => Err(Error::Eval(format!(
346 "server placement does not support {} latency",
347 latency.wire_label()
348 ))),
349 }
350}
351
352fn data_frame(
353 cx: &mut Cx,
354 codec: Symbol,
355 metadata: &StreamMetadata,
356 sequence: u64,
357 kind: Symbol,
358 payload: Expr,
359 frame_envelope: FrameEnvelope,
360) -> Result<ServerFrame> {
361 let packet = StreamPacket::data(kind, payload);
362 let envelope = StreamEnvelope::new(
363 metadata.id().clone(),
364 packet_id(metadata.id(), sequence),
365 StreamMedia::Data,
366 StreamDirection::Source,
367 sequence,
368 Vec::new(),
369 ClockDomain::ServerFrame,
370 TransportProfile::remote_stream_fabric(),
371 Vec::new(),
372 packet,
373 )?;
374 stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
375}
376
377fn refusal_frame(
378 cx: &mut Cx,
379 codec: Symbol,
380 metadata: &StreamMetadata,
381 sequence: u64,
382 message: String,
383 frame_envelope: FrameEnvelope,
384) -> Result<ServerFrame> {
385 let kind = server_placement_refusal_diagnostic_kind();
386 let packet = diagnostic_stream_packet(kind.clone(), message);
387 let envelope = StreamEnvelope::new(
388 metadata.id().clone(),
389 packet_id(metadata.id(), sequence),
390 StreamMedia::Diagnostic,
391 StreamDirection::Source,
392 sequence,
393 Vec::new(),
394 ClockDomain::ServerFrame,
395 TransportProfile::remote_stream_fabric(),
396 vec![kind],
397 packet,
398 )?;
399 stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
400}
401
402fn finish_with_refusal(
403 cx: &mut Cx,
404 codec: Symbol,
405 metadata: &StreamMetadata,
406 sequence: u64,
407 message: String,
408 frame_envelope: FrameEnvelope,
409 mut frames: Vec<ServerFrame>,
410) -> Result<Vec<ServerFrame>> {
411 frames.push(refusal_frame(
412 cx,
413 codec.clone(),
414 metadata,
415 sequence,
416 message,
417 frame_envelope.clone(),
418 )?);
419 frames.push(stream_end_frame(codec, frame_envelope));
420 Ok(frames)
421}
422
423fn limit_cutoff_message(
424 frame: &ServerFrame,
425 limits: PlacementResourceLimits,
426 accounted_payload_bytes: &mut usize,
427 data_frame_count: usize,
428 frame_count: usize,
429) -> Option<String> {
430 if data_frame_count >= limits.effective_stream_frame_limit() {
431 return Some(format!(
432 "server placement resource cutoff: stream-size limit exceeded after {data_frame_count} chunks"
433 ));
434 }
435 if frame_count >= limits.max_inflight_work {
436 return Some(format!(
437 "server placement resource cutoff: inflight-work limit exceeded at {} frames",
438 limits.max_inflight_work
439 ));
440 }
441 if frame.payload.len() > limits.max_frame_payload_bytes {
442 return Some(format!(
443 "server placement resource cutoff: frame-size limit exceeded: {} bytes > {} bytes",
444 frame.payload.len(),
445 limits.max_frame_payload_bytes
446 ));
447 }
448 let next_payload_bytes = accounted_payload_bytes.saturating_add(frame.payload.len());
449 if next_payload_bytes > limits.max_memory_bytes {
450 return Some(format!(
451 "server placement resource cutoff: memory limit exceeded: {next_payload_bytes} bytes > {} bytes",
452 limits.max_memory_bytes
453 ));
454 }
455 *accounted_payload_bytes = next_payload_bytes;
456 None
457}
458
459fn require_host_device_access(
460 cx: &mut Cx,
461 fragment: &PlacedFragment,
462 request: &ServerPlacementRequest,
463) -> Result<()> {
464 if placement_uses_host_device(fragment, request) {
465 cx.require(&placement_host_device_capability())?;
466 }
467 Ok(())
468}
469
470fn envelope_with_profile(
471 envelope: &StreamEnvelope,
472 profile: TransportProfile,
473) -> Result<StreamEnvelope> {
474 StreamEnvelope::new_with_clock_domains(
475 envelope.stream_id().clone(),
476 envelope.packet_id().clone(),
477 envelope.media(),
478 envelope.direction(),
479 envelope.sequence(),
480 envelope.ticks().to_vec(),
481 envelope.clock_domain(),
482 envelope.clock_domains().to_vec(),
483 profile,
484 envelope.diagnostics().to_vec(),
485 envelope.packet().clone(),
486 )
487}
488
489fn redacted_fragment_label(fragment: &PlacedFragment) -> String {
490 redact_placement_symbol(fragment.id()).as_qualified_str()
491}
492
493fn packet_id(stream_id: &Symbol, sequence: u64) -> Symbol {
494 Symbol::qualified(
495 "stream/packet-id",
496 format!(
497 "{}#server-placement-{sequence}",
498 stream_id.as_qualified_str()
499 ),
500 )
501}