1use sim_kernel::{DatumStore, Event, EventKind, Ref, Result, Symbol, stream_surface};
2use sim_lib_server::{
3 FrameEnvelope, FrameKind, ServerFrame, stream_chunk_frame_from_expr, stream_end_frame,
4 stream_frame_from_expr, stream_frame_to_expr,
5};
6use sim_lib_stream_core::{
7 ClockDomain, StreamCassette, StreamDirection, StreamEnvelope, StreamItem, StreamMedia,
8 StreamMetadata, StreamRemoteLimits, StreamValue, TransportProfile,
9 stream_remote_network_capability,
10};
11
12use crate::events::{
13 diagnostic_stream_packet, error_message, metadata_from_expr, packet_and_ticks_from_remote_expr,
14 packet_ref, remote_error_packet, stream_limit_diagnostic_kind,
15};
16
17#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub struct StreamFrameLimits {
25 pub max_frame_payload_bytes: usize,
27 pub max_stream_frames: usize,
29 pub max_inflight_frames: usize,
31 pub max_duration_ms: u64,
33 pub max_rate_hz: u32,
35}
36
37impl Default for StreamFrameLimits {
38 fn default() -> Self {
39 let limits = StreamRemoteLimits::default();
40 Self {
41 max_frame_payload_bytes: limits.max_frame_payload_bytes,
42 max_stream_frames: limits.max_stream_frames,
43 max_inflight_frames: limits.max_inflight_frames,
44 max_duration_ms: limits.max_duration_ms,
45 max_rate_hz: limits.max_rate_hz,
46 }
47 }
48}
49
50impl StreamFrameLimits {
51 pub fn remote_limits(self) -> StreamRemoteLimits {
55 StreamRemoteLimits {
56 max_frame_payload_bytes: self.max_frame_payload_bytes,
57 max_stream_frames: self.max_stream_frames,
58 max_inflight_frames: self.max_inflight_frames,
59 max_duration_ms: self.max_duration_ms,
60 max_rate_hz: self.max_rate_hz,
61 max_binary_payload_bytes: StreamRemoteLimits::default().max_binary_payload_bytes,
62 }
63 }
64}
65
66pub fn stream_to_frames(
71 cx: &mut sim_kernel::Cx,
72 stream: &StreamValue,
73 codec: Symbol,
74) -> Result<Vec<ServerFrame>> {
75 stream_to_frames_with_envelope(cx, stream, codec, FrameEnvelope::default())
76}
77
78pub fn stream_to_frames_with_envelope(
84 cx: &mut sim_kernel::Cx,
85 stream: &StreamValue,
86 codec: Symbol,
87 envelope: FrameEnvelope,
88) -> Result<Vec<ServerFrame>> {
89 stream_to_frames_with_limits(
90 cx,
91 stream,
92 codec,
93 envelope,
94 TransportProfile::remote_stream_fabric(),
95 StreamFrameLimits::default(),
96 )
97}
98
99pub fn stream_to_frames_with_profile(
103 cx: &mut sim_kernel::Cx,
104 stream: &StreamValue,
105 codec: Symbol,
106 envelope: FrameEnvelope,
107 profile: TransportProfile,
108) -> Result<Vec<ServerFrame>> {
109 stream_to_frames_with_limits(
110 cx,
111 stream,
112 codec,
113 envelope,
114 profile,
115 StreamFrameLimits::default(),
116 )
117}
118
119pub fn stream_to_frames_with_limits(
127 cx: &mut sim_kernel::Cx,
128 stream: &StreamValue,
129 codec: Symbol,
130 envelope: FrameEnvelope,
131 profile: TransportProfile,
132 limits: StreamFrameLimits,
133) -> Result<Vec<ServerFrame>> {
134 cx.require(&stream_remote_network_capability())?;
135 let remote_limits = limits.remote_limits();
136 remote_limits.validate()?;
137 let effective_frame_limit = remote_limits.effective_frame_limit();
138 let mut frames = Vec::new();
139 frames.push(stream_frame_from_expr(
140 cx,
141 codec.clone(),
142 FrameKind::StreamStart,
143 &stream.metadata().table_expr(),
144 envelope.clone(),
145 )?);
146 let mut sequence = 0_u64;
147 while let Some(item) = stream.next_packet()? {
148 if sequence as usize >= effective_frame_limit {
149 frames.push(limit_diagnostic_frame(
150 cx,
151 codec.clone(),
152 stream.metadata(),
153 sequence,
154 stream_size_limit_message(&limits, effective_frame_limit),
155 envelope.clone(),
156 )?);
157 break;
158 }
159 if frames.len() >= limits.max_inflight_frames {
160 frames.push(limit_diagnostic_frame(
161 cx,
162 codec.clone(),
163 stream.metadata(),
164 sequence,
165 format!(
166 "stream/fabric inflight-frame limit exceeded at {} frames",
167 limits.max_inflight_frames
168 ),
169 envelope.clone(),
170 )?);
171 break;
172 }
173 let frame = envelope_chunk_frame(
174 cx,
175 codec.clone(),
176 stream.metadata(),
177 sequence,
178 &item,
179 profile.clone(),
180 envelope.clone(),
181 )?;
182 if frame.payload.len() > limits.max_frame_payload_bytes {
183 frames.push(limit_diagnostic_frame(
184 cx,
185 codec.clone(),
186 stream.metadata(),
187 sequence,
188 format!(
189 "stream/fabric frame-size limit exceeded: {} bytes > {} bytes",
190 frame.payload.len(),
191 limits.max_frame_payload_bytes
192 ),
193 envelope.clone(),
194 )?);
195 break;
196 }
197 frames.push(frame);
198 sequence = sequence.saturating_add(1);
199 }
200 frames.push(stream_end_frame(codec, envelope));
201 Ok(frames)
202}
203
204fn stream_size_limit_message(limits: &StreamFrameLimits, effective_frame_limit: usize) -> String {
205 if effective_frame_limit < limits.max_stream_frames {
206 format!("stream/fabric duration-rate limit exceeded after {effective_frame_limit} chunks")
207 } else {
208 format!(
209 "stream/fabric stream-size limit exceeded after {} chunks",
210 limits.max_stream_frames
211 )
212 }
213}
214
215pub fn expr_to_stream_chunk_frame(
218 cx: &mut sim_kernel::Cx,
219 codec: Symbol,
220 expr: sim_kernel::Expr,
221 envelope: FrameEnvelope,
222) -> Result<ServerFrame> {
223 stream_chunk_frame_from_expr(cx, codec, &expr, envelope)
224}
225
226pub fn stream_chunk_frame_to_expr(
229 cx: &mut sim_kernel::Cx,
230 frame: &ServerFrame,
231) -> Result<sim_kernel::Expr> {
232 if frame.kind != FrameKind::StreamChunk {
233 return Err(sim_kernel::Error::Eval(format!(
234 "remote stream adapter expected stream chunk frame, got {}",
235 frame.kind.as_symbol()
236 )));
237 }
238 let Some(expr) = stream_frame_to_expr(cx, frame)? else {
239 return Err(sim_kernel::Error::Eval(
240 "stream chunk frame did not decode to a payload".to_owned(),
241 ));
242 };
243 Ok(expr)
244}
245
246pub fn stream_frames_to_stream(
251 cx: &mut sim_kernel::Cx,
252 frames: &[ServerFrame],
253) -> Result<StreamValue> {
254 let run = Ref::Symbol(Symbol::qualified("stream/fabric", "remote-run"));
255 let (metadata, events) = stream_frames_to_events(cx, frames, run)?;
256 crate::event_buffer_to_stream(cx, metadata, events)
257}
258
259pub fn stream_frames_to_cassette(
264 cx: &mut sim_kernel::Cx,
265 frames: &[ServerFrame],
266) -> Result<StreamCassette> {
267 let stream = stream_frames_to_stream(cx, frames)?;
268 StreamCassette::from_stream_value(&stream, TransportProfile::remote_stream_fabric())
269}
270
271pub fn cassette_to_stream_frames(
275 cx: &mut sim_kernel::Cx,
276 cassette: &StreamCassette,
277 codec: Symbol,
278 envelope: FrameEnvelope,
279) -> Result<Vec<ServerFrame>> {
280 let stream = cassette.replay_stream_value()?;
281 stream_to_frames_with_profile(
282 cx,
283 &stream,
284 codec,
285 envelope,
286 TransportProfile::remote_stream_fabric(),
287 )
288}
289
290pub fn stream_frames_to_events(
296 cx: &mut sim_kernel::Cx,
297 frames: &[ServerFrame],
298 run: Ref,
299) -> Result<(StreamMetadata, Vec<Event>)> {
300 let mut metadata = None;
301 let mut events = Vec::new();
302 let mut seq = 0u64;
303 for frame in frames {
304 match frame.kind {
305 FrameKind::StreamStart => {
306 let expr = frame.decode_expr(cx, sim_kernel::ReadPolicy::default())?;
307 metadata = Some(metadata_from_expr(&expr)?);
308 }
309 FrameKind::StreamChunk | FrameKind::StreamEnd | FrameKind::Error => {
310 if let Some(event) = remote_frame_to_event(cx, run.clone(), seq, frame)? {
311 let done = matches!(event.kind, EventKind::Done);
312 events.push(event);
313 seq = seq.saturating_add(1);
314 if done {
315 break;
316 }
317 }
318 }
319 _ => {
320 return Err(sim_kernel::Error::Eval(format!(
321 "remote stream adapter cannot consume frame kind {}",
322 frame.kind.as_symbol()
323 )));
324 }
325 }
326 }
327 let metadata = metadata.ok_or_else(|| {
328 sim_kernel::Error::Eval("remote stream frames missing StreamStart metadata".to_owned())
329 })?;
330 Ok((metadata, events))
331}
332
333pub fn remote_frame_to_event(
339 cx: &mut sim_kernel::Cx,
340 run: Ref,
341 seq: u64,
342 frame: &ServerFrame,
343) -> Result<Option<Event>> {
344 match frame.kind {
345 FrameKind::StreamChunk => {
346 let Some(expr) = stream_frame_to_expr(cx, frame)? else {
347 return Err(sim_kernel::Error::Eval(
348 "stream chunk frame did not decode to a payload".to_owned(),
349 ));
350 };
351 let (packet, ticks) = packet_and_ticks_from_remote_expr(expr);
352 Ok(Some(stream_surface::remote_stream_frame_event(
353 run,
354 seq,
355 ticks,
356 packet_ref(cx, &packet)?,
357 )?))
358 }
359 FrameKind::StreamEnd => Ok(Some(Event::done(run, seq)?)),
360 FrameKind::Error => {
361 let packet = remote_error_packet(remote_error_message(cx, frame));
362 Ok(Some(stream_surface::remote_stream_frame_event(
363 run,
364 seq,
365 Vec::new(),
366 packet_ref(cx, &packet)?,
367 )?))
368 }
369 FrameKind::StreamStart => Ok(None),
370 _ => Err(sim_kernel::Error::Eval(format!(
371 "remote stream adapter cannot convert frame kind {}",
372 frame.kind.as_symbol()
373 ))),
374 }
375}
376
377fn remote_error_message(cx: &mut sim_kernel::Cx, frame: &ServerFrame) -> String {
378 match frame.decode_expr(cx, sim_kernel::ReadPolicy::default()) {
379 Ok(expr) => match sim_kernel::Datum::try_from(expr) {
380 Ok(datum) => match cx.datum_store_mut().intern(datum) {
381 Ok(id) => error_message(cx, &Ref::Content(id)),
382 Err(err) => err.to_string(),
383 },
384 Err(err) => err.to_string(),
385 },
386 Err(err) => err.to_string(),
387 }
388}
389
390fn envelope_chunk_frame(
391 cx: &mut sim_kernel::Cx,
392 codec: Symbol,
393 metadata: &StreamMetadata,
394 sequence: u64,
395 item: &StreamItem,
396 profile: TransportProfile,
397 frame_envelope: FrameEnvelope,
398) -> Result<ServerFrame> {
399 let envelope = StreamEnvelope::from_item_with_profile(metadata, sequence, item, profile)?;
400 stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
401}
402
403fn limit_diagnostic_frame(
404 cx: &mut sim_kernel::Cx,
405 codec: Symbol,
406 metadata: &StreamMetadata,
407 sequence: u64,
408 message: String,
409 frame_envelope: FrameEnvelope,
410) -> Result<ServerFrame> {
411 let kind = stream_limit_diagnostic_kind();
412 let packet = diagnostic_stream_packet(kind.clone(), message);
413 let envelope = StreamEnvelope::new(
414 metadata.id().clone(),
415 Symbol::qualified(
416 "stream/packet-id",
417 format!("{}#diagnostic-{sequence}", metadata.id().as_qualified_str()),
418 ),
419 StreamMedia::Diagnostic,
420 StreamDirection::Source,
421 sequence,
422 Vec::new(),
423 ClockDomain::ServerFrame,
424 TransportProfile::remote_stream_fabric(),
425 vec![kind],
426 packet,
427 )?;
428 stream_chunk_frame_from_expr(cx, codec, &envelope.to_expr(), frame_envelope)
429}