Skip to main content

sim_lib_stream_fabric/
placement.rs

1use std::time::Duration;
2
3use sim_kernel::{Cx, Error, Expr, Result, Symbol};
4use sim_lib_server::{
5    FrameEnvelope, FrameKind, ServerFrame, Site, stream_chunk_frame_from_expr, stream_end_frame,
6    stream_frame_from_expr,
7};
8use sim_lib_stream_core::{
9    BufferPolicy, ClockDomain, LatencyClass, PlacedFragment, StreamCapability, StreamDirection,
10    StreamEnvelope, StreamMedia, StreamMetadata, StreamPacket, TransportProfile,
11    stream_remote_network_capability, stream_remote_preview_capability,
12};
13
14use crate::events::diagnostic_stream_packet;
15use crate::placement_report::{
16    placement_uses_host_device, request_report_expr, result_report_expr,
17};
18use crate::placement_security::{
19    PlacementResourceLimits, placement_host_device_capability, placement_remote_render_capability,
20    placement_run_node_on_server_capability, redact_placement_symbol,
21};
22
23/// Server placement request carried through stream-fabric server frames.
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub struct ServerPlacementRequest {
26    placement_report: Expr,
27    output_profile: TransportProfile,
28    realtime_pinned: bool,
29    resource_limits: PlacementResourceLimits,
30}
31
32impl ServerPlacementRequest {
33    /// Creates a request for `placement_report` declaring `output_profile`.
34    ///
35    /// Realtime pinning is off and resource limits start at their defaults.
36    pub fn new(placement_report: Expr, output_profile: TransportProfile) -> Self {
37        Self {
38            placement_report,
39            output_profile,
40            realtime_pinned: false,
41            resource_limits: PlacementResourceLimits::default(),
42        }
43    }
44
45    /// Creates a request whose output uses the server buffered-preview profile.
46    pub fn buffered_preview(placement_report: Expr) -> Self {
47        Self::new(placement_report, server_buffered_preview_profile())
48    }
49
50    /// Creates a request whose output uses the server render-return profile.
51    pub fn render_return(placement_report: Expr) -> Self {
52        Self::new(placement_report, server_render_return_profile())
53    }
54
55    /// Returns this request with realtime pinning set to `realtime_pinned`.
56    ///
57    /// A realtime-pinned node cannot be hosted on the server site and is
58    /// refused during placement.
59    pub fn with_realtime_pin(mut self, realtime_pinned: bool) -> Self {
60        self.realtime_pinned = realtime_pinned;
61        self
62    }
63
64    /// Returns this request with its resource limits replaced.
65    pub fn with_resource_limits(mut self, resource_limits: PlacementResourceLimits) -> Self {
66        self.resource_limits = resource_limits;
67        self
68    }
69
70    /// Returns the placement report `Expr` describing the work to place.
71    pub fn placement_report(&self) -> &Expr {
72        &self.placement_report
73    }
74
75    /// Returns the declared output transport profile.
76    pub fn output_profile(&self) -> &TransportProfile {
77        &self.output_profile
78    }
79
80    /// Returns whether the placed node is pinned to realtime execution.
81    pub fn realtime_pinned(&self) -> bool {
82        self.realtime_pinned
83    }
84
85    /// Returns the resource limits governing this placement.
86    pub fn resource_limits(&self) -> PlacementResourceLimits {
87        self.resource_limits
88    }
89}
90
91/// Returns the site symbol naming the server placement target.
92pub fn server_site_symbol() -> Symbol {
93    Symbol::qualified("stream/site", "server")
94}
95
96/// Returns the operation symbol for a server placement request data frame.
97pub fn server_placement_request_symbol() -> Symbol {
98    Symbol::qualified("stream/fabric", "server-placement-request")
99}
100
101/// Returns the operation symbol for a server placement result data frame.
102pub fn server_placement_result_symbol() -> Symbol {
103    Symbol::qualified("stream/fabric", "server-placement-result")
104}
105
106/// Returns the operation symbol for a server placement refusal data frame.
107pub fn server_placement_refusal_symbol() -> Symbol {
108    Symbol::qualified("stream/fabric", "server-placement-refusal")
109}
110
111/// Returns the request, result, and refusal placement operation symbols.
112pub fn server_placement_frame_operation_symbols() -> [Symbol; 3] {
113    [
114        server_placement_request_symbol(),
115        server_placement_result_symbol(),
116        server_placement_refusal_symbol(),
117    ]
118}
119
120/// Returns the diagnostic kind emitted when server placement is refused.
121pub fn server_placement_refusal_diagnostic_kind() -> Symbol {
122    Symbol::qualified("stream/fabric", "ServerPlacementRefused")
123}
124
125/// Returns the server buffered-preview output profile.
126///
127/// A remote, bounded, lossy preview profile in the buffered-preview latency
128/// class, suitable for server-hosted placement that streams a preview back.
129pub fn server_buffered_preview_profile() -> TransportProfile {
130    TransportProfile::new(
131        Symbol::qualified("stream/profile", "server-buffered-preview"),
132        LatencyClass::BufferedPreview,
133        vec![
134            StreamCapability::Remote,
135            StreamCapability::Bounded,
136            StreamCapability::Preview,
137            StreamCapability::Lossy,
138        ],
139    )
140    .expect("server-buffered-preview stream profile is valid")
141}
142
143/// Returns the server render-return output profile.
144///
145/// A remote, bounded, deterministic, replayable, resumable profile in the
146/// offline-render latency class, suitable for returning a fully rendered result.
147pub fn server_render_return_profile() -> TransportProfile {
148    TransportProfile::new(
149        Symbol::qualified("stream/profile", "server-render-return"),
150        LatencyClass::OfflineRender,
151        vec![
152            StreamCapability::Remote,
153            StreamCapability::Bounded,
154            StreamCapability::Deterministic,
155            StreamCapability::Replayable,
156            StreamCapability::Resumable,
157        ],
158    )
159    .expect("server-render-return stream profile is valid")
160}
161
162/// Places a fragment on the server site and encodes the exchange as frames.
163///
164/// Requires the run-node-on-server and remote-network capabilities (plus
165/// host-device access when the fragment touches a host device), validates the
166/// request's resource limits, then emits a `StreamStart` frame, a redacted
167/// request-report frame, a result-report frame for the realized node, one chunk
168/// frame per output envelope, and a closing `StreamEnd` frame. Realtime-pinned
169/// requests and any resource-limit breach short-circuit to a refusal frame
170/// followed by `StreamEnd` instead of executing the node.
171pub fn server_placement_stream_frames(
172    cx: &mut Cx,
173    site: &dyn Site,
174    fragment: PlacedFragment,
175    request: ServerPlacementRequest,
176    codec: Symbol,
177    frame_envelope: FrameEnvelope,
178) -> Result<Vec<ServerFrame>> {
179    cx.require(&placement_run_node_on_server_capability())?;
180    cx.require(&stream_remote_network_capability())?;
181    require_host_device_access(cx, &fragment, &request)?;
182    let limits = request.resource_limits();
183    limits.validate()?;
184    let metadata = placement_metadata(&fragment, limits);
185    let mut frames = vec![stream_frame_from_expr(
186        cx,
187        codec.clone(),
188        FrameKind::StreamStart,
189        &metadata.table_expr(),
190        frame_envelope.clone(),
191    )?];
192    let mut accounted_payload_bytes = frames
193        .first()
194        .map(|frame| frame.payload.len())
195        .unwrap_or_default();
196    let mut data_frame_count = 0_usize;
197    let mut sequence = 0_u64;
198    let request_frame = data_frame(
199        cx,
200        codec.clone(),
201        &metadata,
202        sequence,
203        server_placement_request_symbol(),
204        request_report_expr(&fragment, &request),
205        frame_envelope.clone(),
206    )?;
207    if let Some(message) = limit_cutoff_message(
208        &request_frame,
209        limits,
210        &mut accounted_payload_bytes,
211        data_frame_count,
212        frames.len(),
213    ) {
214        return finish_with_refusal(
215            cx,
216            codec,
217            &metadata,
218            sequence,
219            message,
220            frame_envelope,
221            frames,
222        );
223    }
224    frames.push(request_frame);
225    data_frame_count = data_frame_count.saturating_add(1);
226    sequence = sequence.saturating_add(1);
227
228    if request.realtime_pinned {
229        return finish_with_refusal(
230            cx,
231            codec,
232            &metadata,
233            sequence,
234            format!(
235                "server placement refused fragment {}: stream/site/server cannot host a realtime-pinned node",
236                redacted_fragment_label(&fragment)
237            ),
238            frame_envelope,
239            frames,
240        );
241    }
242
243    require_server_output_profile(cx, request.output_profile())?;
244    site.accept_input_edges(fragment.input_edges())?;
245    let reply = site.realize_fragment_node_with_timeout(
246        cx,
247        &fragment,
248        Some(Duration::from_millis(limits.max_cpu_time_ms)),
249    )?;
250    let payload = reply.value.object().as_expr(cx)?;
251    let result_frame = data_frame(
252        cx,
253        codec.clone(),
254        &metadata,
255        sequence,
256        server_placement_result_symbol(),
257        result_report_expr(&fragment, payload, request.output_profile()),
258        frame_envelope.clone(),
259    )?;
260    if let Some(message) = limit_cutoff_message(
261        &result_frame,
262        limits,
263        &mut accounted_payload_bytes,
264        data_frame_count,
265        frames.len(),
266    ) {
267        return finish_with_refusal(
268            cx,
269            codec,
270            &metadata,
271            sequence,
272            message,
273            frame_envelope,
274            frames,
275        );
276    }
277    frames.push(result_frame);
278    data_frame_count = data_frame_count.saturating_add(1);
279    sequence = sequence.saturating_add(1);
280
281    for envelope in fragment.output_envelopes() {
282        let output = envelope_with_profile(&envelope, request.output_profile().clone())?;
283        let frame = stream_chunk_frame_from_expr(
284            cx,
285            codec.clone(),
286            &output.to_expr(),
287            frame_envelope.clone(),
288        )?;
289        if let Some(message) = limit_cutoff_message(
290            &frame,
291            limits,
292            &mut accounted_payload_bytes,
293            data_frame_count,
294            frames.len(),
295        ) {
296            return finish_with_refusal(
297                cx,
298                codec,
299                &metadata,
300                sequence,
301                message,
302                frame_envelope,
303                frames,
304            );
305        }
306        frames.push(frame);
307        data_frame_count = data_frame_count.saturating_add(1);
308        sequence = sequence.saturating_add(1);
309    }
310
311    frames.push(stream_end_frame(codec, frame_envelope));
312    Ok(frames)
313}
314
315fn placement_metadata(
316    fragment: &PlacedFragment,
317    limits: PlacementResourceLimits,
318) -> StreamMetadata {
319    StreamMetadata::new(
320        redact_placement_symbol(&Symbol::qualified(
321            "stream/server-placement",
322            fragment.id().as_qualified_str(),
323        )),
324        StreamMedia::Data,
325        StreamDirection::Source,
326        ClockDomain::ServerFrame.symbol(),
327        BufferPolicy::bounded(limits.max_inflight_work)
328            .expect("validated placement inflight-work limit is nonzero"),
329    )
330}
331
332fn require_server_output_profile(cx: &mut Cx, profile: &TransportProfile) -> Result<()> {
333    if profile.has_capability(StreamCapability::Realtime)
334        || profile.latency_class() == LatencyClass::SampleExact
335    {
336        return Err(Error::Eval(format!(
337            "server placement refuses realtime profile {}",
338            profile.name().as_qualified_str()
339        )));
340    }
341    match profile.latency_class() {
342        LatencyClass::BufferedPreview => cx.require(&stream_remote_preview_capability()),
343        LatencyClass::OfflineRender => cx.require(&placement_remote_render_capability()),
344        LatencyClass::Interactive => Ok(()),
345        latency => Err(Error::Eval(format!(
346            "server placement does not support {} latency",
347            latency.wire_label()
348        ))),
349    }
350}
351
352fn data_frame(
353    cx: &mut Cx,
354    codec: Symbol,
355    metadata: &StreamMetadata,
356    sequence: u64,
357    kind: Symbol,
358    payload: Expr,
359    frame_envelope: FrameEnvelope,
360) -> Result<ServerFrame> {
361    let packet = StreamPacket::data(kind, payload);
362    let envelope = StreamEnvelope::new(
363        metadata.id().clone(),
364        packet_id(metadata.id(), sequence),
365        StreamMedia::Data,
366        StreamDirection::Source,
367        sequence,
368        Vec::new(),
369        ClockDomain::ServerFrame,
370        TransportProfile::remote_stream_fabric(),
371        Vec::new(),
372        packet,
373    )?;
374    stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
375}
376
377fn refusal_frame(
378    cx: &mut Cx,
379    codec: Symbol,
380    metadata: &StreamMetadata,
381    sequence: u64,
382    message: String,
383    frame_envelope: FrameEnvelope,
384) -> Result<ServerFrame> {
385    let kind = server_placement_refusal_diagnostic_kind();
386    let packet = diagnostic_stream_packet(kind.clone(), message);
387    let envelope = StreamEnvelope::new(
388        metadata.id().clone(),
389        packet_id(metadata.id(), sequence),
390        StreamMedia::Diagnostic,
391        StreamDirection::Source,
392        sequence,
393        Vec::new(),
394        ClockDomain::ServerFrame,
395        TransportProfile::remote_stream_fabric(),
396        vec![kind],
397        packet,
398    )?;
399    stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
400}
401
402fn finish_with_refusal(
403    cx: &mut Cx,
404    codec: Symbol,
405    metadata: &StreamMetadata,
406    sequence: u64,
407    message: String,
408    frame_envelope: FrameEnvelope,
409    mut frames: Vec<ServerFrame>,
410) -> Result<Vec<ServerFrame>> {
411    frames.push(refusal_frame(
412        cx,
413        codec.clone(),
414        metadata,
415        sequence,
416        message,
417        frame_envelope.clone(),
418    )?);
419    frames.push(stream_end_frame(codec, frame_envelope));
420    Ok(frames)
421}
422
423fn limit_cutoff_message(
424    frame: &ServerFrame,
425    limits: PlacementResourceLimits,
426    accounted_payload_bytes: &mut usize,
427    data_frame_count: usize,
428    frame_count: usize,
429) -> Option<String> {
430    if data_frame_count >= limits.effective_stream_frame_limit() {
431        return Some(format!(
432            "server placement resource cutoff: stream-size limit exceeded after {data_frame_count} chunks"
433        ));
434    }
435    if frame_count >= limits.max_inflight_work {
436        return Some(format!(
437            "server placement resource cutoff: inflight-work limit exceeded at {} frames",
438            limits.max_inflight_work
439        ));
440    }
441    if frame.payload.len() > limits.max_frame_payload_bytes {
442        return Some(format!(
443            "server placement resource cutoff: frame-size limit exceeded: {} bytes > {} bytes",
444            frame.payload.len(),
445            limits.max_frame_payload_bytes
446        ));
447    }
448    let next_payload_bytes = accounted_payload_bytes.saturating_add(frame.payload.len());
449    if next_payload_bytes > limits.max_memory_bytes {
450        return Some(format!(
451            "server placement resource cutoff: memory limit exceeded: {next_payload_bytes} bytes > {} bytes",
452            limits.max_memory_bytes
453        ));
454    }
455    *accounted_payload_bytes = next_payload_bytes;
456    None
457}
458
459fn require_host_device_access(
460    cx: &mut Cx,
461    fragment: &PlacedFragment,
462    request: &ServerPlacementRequest,
463) -> Result<()> {
464    if placement_uses_host_device(fragment, request) {
465        cx.require(&placement_host_device_capability())?;
466    }
467    Ok(())
468}
469
470fn envelope_with_profile(
471    envelope: &StreamEnvelope,
472    profile: TransportProfile,
473) -> Result<StreamEnvelope> {
474    StreamEnvelope::new_with_clock_domains(
475        envelope.stream_id().clone(),
476        envelope.packet_id().clone(),
477        envelope.media(),
478        envelope.direction(),
479        envelope.sequence(),
480        envelope.ticks().to_vec(),
481        envelope.clock_domain(),
482        envelope.clock_domains().to_vec(),
483        profile,
484        envelope.diagnostics().to_vec(),
485        envelope.packet().clone(),
486    )
487}
488
489fn redacted_fragment_label(fragment: &PlacedFragment) -> String {
490    redact_placement_symbol(fragment.id()).as_qualified_str()
491}
492
493fn packet_id(stream_id: &Symbol, sequence: u64) -> Symbol {
494    Symbol::qualified(
495        "stream/packet-id",
496        format!(
497            "{}#server-placement-{sequence}",
498            stream_id.as_qualified_str()
499        ),
500    )
501}