Skip to main content

sim_lib_stream_fabric/
frames.rs

1use sim_kernel::{DatumStore, Event, EventKind, Ref, Result, Symbol, stream_surface};
2use sim_lib_server::{
3    FrameEnvelope, FrameKind, ServerFrame, stream_chunk_frame_from_expr, stream_end_frame,
4    stream_frame_from_expr, stream_frame_to_expr,
5};
6use sim_lib_stream_core::{
7    ClockDomain, StreamCassette, StreamDirection, StreamEnvelope, StreamItem, StreamMedia,
8    StreamMetadata, StreamRemoteLimits, StreamValue, TransportProfile,
9    stream_remote_network_capability,
10};
11
12use crate::events::{
13    diagnostic_stream_packet, error_message, metadata_from_expr, packet_and_ticks_from_remote_expr,
14    packet_ref, remote_error_packet, stream_limit_diagnostic_kind,
15};
16
17/// Resource bounds applied while encoding a stream into server frames.
18///
19/// Mirrors `StreamRemoteLimits` from `sim-lib-stream-core` in fabric terms;
20/// once a bound is crossed the encoder appends a limit diagnostic frame and
21/// stops rather than emitting an unbounded run of chunks. The [`Default`] impl
22/// inherits the stream-core remote defaults.
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub struct StreamFrameLimits {
25    /// Maximum encoded payload size, in bytes, allowed for a single frame.
26    pub max_frame_payload_bytes: usize,
27    /// Maximum number of chunk frames emitted for one stream.
28    pub max_stream_frames: usize,
29    /// Maximum number of frames allowed in flight at once.
30    pub max_inflight_frames: usize,
31    /// Maximum stream duration, in milliseconds, before truncation.
32    pub max_duration_ms: u64,
33    /// Maximum stream rate, in hertz, used to derive the effective frame limit.
34    pub max_rate_hz: u32,
35}
36
37impl Default for StreamFrameLimits {
38    fn default() -> Self {
39        let limits = StreamRemoteLimits::default();
40        Self {
41            max_frame_payload_bytes: limits.max_frame_payload_bytes,
42            max_stream_frames: limits.max_stream_frames,
43            max_inflight_frames: limits.max_inflight_frames,
44            max_duration_ms: limits.max_duration_ms,
45            max_rate_hz: limits.max_rate_hz,
46        }
47    }
48}
49
50impl StreamFrameLimits {
51    /// Converts these fabric limits into a stream-core `StreamRemoteLimits`.
52    ///
53    /// The binary-payload bound is filled from the stream-core default.
54    pub fn remote_limits(self) -> StreamRemoteLimits {
55        StreamRemoteLimits {
56            max_frame_payload_bytes: self.max_frame_payload_bytes,
57            max_stream_frames: self.max_stream_frames,
58            max_inflight_frames: self.max_inflight_frames,
59            max_duration_ms: self.max_duration_ms,
60            max_rate_hz: self.max_rate_hz,
61            max_binary_payload_bytes: StreamRemoteLimits::default().max_binary_payload_bytes,
62        }
63    }
64}
65
66/// Encodes a stream into server frames with default envelope, profile, limits.
67///
68/// Convenience wrapper over [`stream_to_frames_with_envelope`] using
69/// `FrameEnvelope::default()`.
70pub fn stream_to_frames(
71    cx: &mut sim_kernel::Cx,
72    stream: &StreamValue,
73    codec: Symbol,
74) -> Result<Vec<ServerFrame>> {
75    stream_to_frames_with_envelope(cx, stream, codec, FrameEnvelope::default())
76}
77
78/// Encodes a stream into server frames under a caller-supplied frame envelope.
79///
80/// Uses the remote-stream-fabric transport profile and default
81/// [`StreamFrameLimits`]; see [`stream_to_frames_with_limits`] for the full
82/// surface.
83pub fn stream_to_frames_with_envelope(
84    cx: &mut sim_kernel::Cx,
85    stream: &StreamValue,
86    codec: Symbol,
87    envelope: FrameEnvelope,
88) -> Result<Vec<ServerFrame>> {
89    stream_to_frames_with_limits(
90        cx,
91        stream,
92        codec,
93        envelope,
94        TransportProfile::remote_stream_fabric(),
95        StreamFrameLimits::default(),
96    )
97}
98
99/// Encodes a stream into server frames under a caller-chosen transport profile.
100///
101/// Uses default [`StreamFrameLimits`]; see [`stream_to_frames_with_limits`].
102pub fn stream_to_frames_with_profile(
103    cx: &mut sim_kernel::Cx,
104    stream: &StreamValue,
105    codec: Symbol,
106    envelope: FrameEnvelope,
107    profile: TransportProfile,
108) -> Result<Vec<ServerFrame>> {
109    stream_to_frames_with_limits(
110        cx,
111        stream,
112        codec,
113        envelope,
114        profile,
115        StreamFrameLimits::default(),
116    )
117}
118
119/// Encodes a stream into server frames, enforcing every fabric resource bound.
120///
121/// Emits a `StreamStart` frame carrying metadata, one chunk frame per packet,
122/// and a terminating `StreamEnd` frame. Requires the remote-network capability.
123/// When any of `limits` (frame size, stream size, inflight count, duration or
124/// rate) is exceeded, a limit diagnostic frame is appended and encoding stops
125/// rather than producing an unbounded stream.
126pub fn stream_to_frames_with_limits(
127    cx: &mut sim_kernel::Cx,
128    stream: &StreamValue,
129    codec: Symbol,
130    envelope: FrameEnvelope,
131    profile: TransportProfile,
132    limits: StreamFrameLimits,
133) -> Result<Vec<ServerFrame>> {
134    cx.require(&stream_remote_network_capability())?;
135    let remote_limits = limits.remote_limits();
136    remote_limits.validate()?;
137    let effective_frame_limit = remote_limits.effective_frame_limit();
138    let mut frames = Vec::new();
139    frames.push(stream_frame_from_expr(
140        cx,
141        codec.clone(),
142        FrameKind::StreamStart,
143        &stream.metadata().table_expr(),
144        envelope.clone(),
145    )?);
146    let mut sequence = 0_u64;
147    while let Some(item) = stream.next_packet()? {
148        if sequence as usize >= effective_frame_limit {
149            frames.push(limit_diagnostic_frame(
150                cx,
151                codec.clone(),
152                stream.metadata(),
153                sequence,
154                stream_size_limit_message(&limits, effective_frame_limit),
155                envelope.clone(),
156            )?);
157            break;
158        }
159        if frames.len() >= limits.max_inflight_frames {
160            frames.push(limit_diagnostic_frame(
161                cx,
162                codec.clone(),
163                stream.metadata(),
164                sequence,
165                format!(
166                    "stream/fabric inflight-frame limit exceeded at {} frames",
167                    limits.max_inflight_frames
168                ),
169                envelope.clone(),
170            )?);
171            break;
172        }
173        let frame = envelope_chunk_frame(
174            cx,
175            codec.clone(),
176            stream.metadata(),
177            sequence,
178            &item,
179            profile.clone(),
180            envelope.clone(),
181        )?;
182        if frame.payload.len() > limits.max_frame_payload_bytes {
183            frames.push(limit_diagnostic_frame(
184                cx,
185                codec.clone(),
186                stream.metadata(),
187                sequence,
188                format!(
189                    "stream/fabric frame-size limit exceeded: {} bytes > {} bytes",
190                    frame.payload.len(),
191                    limits.max_frame_payload_bytes
192                ),
193                envelope.clone(),
194            )?);
195            break;
196        }
197        frames.push(frame);
198        sequence = sequence.saturating_add(1);
199    }
200    frames.push(stream_end_frame(codec, envelope));
201    Ok(frames)
202}
203
204fn stream_size_limit_message(limits: &StreamFrameLimits, effective_frame_limit: usize) -> String {
205    if effective_frame_limit < limits.max_stream_frames {
206        format!("stream/fabric duration-rate limit exceeded after {effective_frame_limit} chunks")
207    } else {
208        format!(
209            "stream/fabric stream-size limit exceeded after {} chunks",
210            limits.max_stream_frames
211        )
212    }
213}
214
215/// Compatibility helper. New code should call
216/// `sim_lib_server::stream_chunk_frame_from_expr` directly.
217pub fn expr_to_stream_chunk_frame(
218    cx: &mut sim_kernel::Cx,
219    codec: Symbol,
220    expr: sim_kernel::Expr,
221    envelope: FrameEnvelope,
222) -> Result<ServerFrame> {
223    stream_chunk_frame_from_expr(cx, codec, &expr, envelope)
224}
225
226/// Compatibility helper. New code should call
227/// `sim_lib_server::stream_frame_to_expr` directly.
228pub fn stream_chunk_frame_to_expr(
229    cx: &mut sim_kernel::Cx,
230    frame: &ServerFrame,
231) -> Result<sim_kernel::Expr> {
232    if frame.kind != FrameKind::StreamChunk {
233        return Err(sim_kernel::Error::Eval(format!(
234            "remote stream adapter expected stream chunk frame, got {}",
235            frame.kind.as_symbol()
236        )));
237    }
238    let Some(expr) = stream_frame_to_expr(cx, frame)? else {
239        return Err(sim_kernel::Error::Eval(
240            "stream chunk frame did not decode to a payload".to_owned(),
241        ));
242    };
243    Ok(expr)
244}
245
246/// Decodes a buffer of server frames back into a lazy [`StreamValue`].
247///
248/// Reconstructs metadata from the `StreamStart` frame, turns the remaining
249/// frames into events, and folds them into a pull-based stream.
250pub fn stream_frames_to_stream(
251    cx: &mut sim_kernel::Cx,
252    frames: &[ServerFrame],
253) -> Result<StreamValue> {
254    let run = Ref::Symbol(Symbol::qualified("stream/fabric", "remote-run"));
255    let (metadata, events) = stream_frames_to_events(cx, frames, run)?;
256    crate::event_buffer_to_stream(cx, metadata, events)
257}
258
259/// Decodes server frames into a replayable [`StreamCassette`].
260///
261/// Recovers the stream via [`stream_frames_to_stream`] and records it under the
262/// remote-stream-fabric transport profile.
263pub fn stream_frames_to_cassette(
264    cx: &mut sim_kernel::Cx,
265    frames: &[ServerFrame],
266) -> Result<StreamCassette> {
267    let stream = stream_frames_to_stream(cx, frames)?;
268    StreamCassette::from_stream_value(&stream, TransportProfile::remote_stream_fabric())
269}
270
271/// Re-encodes a recorded [`StreamCassette`] back into server frames.
272///
273/// Replays the cassette and re-emits it under the remote-stream-fabric profile.
274pub fn cassette_to_stream_frames(
275    cx: &mut sim_kernel::Cx,
276    cassette: &StreamCassette,
277    codec: Symbol,
278    envelope: FrameEnvelope,
279) -> Result<Vec<ServerFrame>> {
280    let stream = cassette.replay_stream_value()?;
281    stream_to_frames_with_profile(
282        cx,
283        &stream,
284        codec,
285        envelope,
286        TransportProfile::remote_stream_fabric(),
287    )
288}
289
290/// Decodes server frames into stream metadata plus a buffer of run events.
291///
292/// The `StreamStart` frame supplies metadata; chunk, end, and error frames
293/// become events attributed to `run`, with decoding stopping at the first
294/// `Done`. Returns an error if no `StreamStart` metadata frame is present.
295pub fn stream_frames_to_events(
296    cx: &mut sim_kernel::Cx,
297    frames: &[ServerFrame],
298    run: Ref,
299) -> Result<(StreamMetadata, Vec<Event>)> {
300    let mut metadata = None;
301    let mut events = Vec::new();
302    let mut seq = 0u64;
303    for frame in frames {
304        match frame.kind {
305            FrameKind::StreamStart => {
306                let expr = frame.decode_expr(cx, sim_kernel::ReadPolicy::default())?;
307                metadata = Some(metadata_from_expr(&expr)?);
308            }
309            FrameKind::StreamChunk | FrameKind::StreamEnd | FrameKind::Error => {
310                if let Some(event) = remote_frame_to_event(cx, run.clone(), seq, frame)? {
311                    let done = matches!(event.kind, EventKind::Done);
312                    events.push(event);
313                    seq = seq.saturating_add(1);
314                    if done {
315                        break;
316                    }
317                }
318            }
319            _ => {
320                return Err(sim_kernel::Error::Eval(format!(
321                    "remote stream adapter cannot consume frame kind {}",
322                    frame.kind.as_symbol()
323                )));
324            }
325        }
326    }
327    let metadata = metadata.ok_or_else(|| {
328        sim_kernel::Error::Eval("remote stream frames missing StreamStart metadata".to_owned())
329    })?;
330    Ok((metadata, events))
331}
332
333/// Converts a single remote server frame into a run event, if it carries one.
334///
335/// Chunk frames yield a remote stream-frame event, end frames yield `Done`, and
336/// error frames yield a remote-error diagnostic event. `StreamStart` frames
337/// carry no event and return `None`.
338pub fn remote_frame_to_event(
339    cx: &mut sim_kernel::Cx,
340    run: Ref,
341    seq: u64,
342    frame: &ServerFrame,
343) -> Result<Option<Event>> {
344    match frame.kind {
345        FrameKind::StreamChunk => {
346            let Some(expr) = stream_frame_to_expr(cx, frame)? else {
347                return Err(sim_kernel::Error::Eval(
348                    "stream chunk frame did not decode to a payload".to_owned(),
349                ));
350            };
351            let (packet, ticks) = packet_and_ticks_from_remote_expr(expr);
352            Ok(Some(stream_surface::remote_stream_frame_event(
353                run,
354                seq,
355                ticks,
356                packet_ref(cx, &packet)?,
357            )?))
358        }
359        FrameKind::StreamEnd => Ok(Some(Event::done(run, seq)?)),
360        FrameKind::Error => {
361            let packet = remote_error_packet(remote_error_message(cx, frame));
362            Ok(Some(stream_surface::remote_stream_frame_event(
363                run,
364                seq,
365                Vec::new(),
366                packet_ref(cx, &packet)?,
367            )?))
368        }
369        FrameKind::StreamStart => Ok(None),
370        _ => Err(sim_kernel::Error::Eval(format!(
371            "remote stream adapter cannot convert frame kind {}",
372            frame.kind.as_symbol()
373        ))),
374    }
375}
376
377fn remote_error_message(cx: &mut sim_kernel::Cx, frame: &ServerFrame) -> String {
378    match frame.decode_expr(cx, sim_kernel::ReadPolicy::default()) {
379        Ok(expr) => match sim_kernel::Datum::try_from(expr) {
380            Ok(datum) => match cx.datum_store_mut().intern(datum) {
381                Ok(id) => error_message(cx, &Ref::Content(id)),
382                Err(err) => err.to_string(),
383            },
384            Err(err) => err.to_string(),
385        },
386        Err(err) => err.to_string(),
387    }
388}
389
390fn envelope_chunk_frame(
391    cx: &mut sim_kernel::Cx,
392    codec: Symbol,
393    metadata: &StreamMetadata,
394    sequence: u64,
395    item: &StreamItem,
396    profile: TransportProfile,
397    frame_envelope: FrameEnvelope,
398) -> Result<ServerFrame> {
399    let envelope = StreamEnvelope::from_item_with_profile(metadata, sequence, item, profile)?;
400    stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
401}
402
403fn limit_diagnostic_frame(
404    cx: &mut sim_kernel::Cx,
405    codec: Symbol,
406    metadata: &StreamMetadata,
407    sequence: u64,
408    message: String,
409    frame_envelope: FrameEnvelope,
410) -> Result<ServerFrame> {
411    let kind = stream_limit_diagnostic_kind();
412    let packet = diagnostic_stream_packet(kind.clone(), message);
413    let envelope = StreamEnvelope::new(
414        metadata.id().clone(),
415        Symbol::qualified(
416            "stream/packet-id",
417            format!("{}#diagnostic-{sequence}", metadata.id().as_qualified_str()),
418        ),
419        StreamMedia::Diagnostic,
420        StreamDirection::Source,
421        sequence,
422        Vec::new(),
423        ClockDomain::ServerFrame,
424        TransportProfile::remote_stream_fabric(),
425        vec![kind],
426        packet,
427    )?;
428    stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
429}