Skip to main content

sim_lib_stream_fabric/
events.rs

1use sim_kernel::{
2    Datum, Diagnostic, Error, Event, EventKind, Expr, Ref, Result, Severity, Symbol, Tick,
3    value_from_ref,
4};
5use sim_lib_stream_core::{
6    StreamCapability, StreamDiagnostic, StreamEnvelope, StreamItem, StreamMetadata, StreamPacket,
7    StreamValue,
8};
9
10/// Returns the diagnostic kind marking a remote-side error surfaced locally.
11pub fn remote_error_diagnostic_kind() -> Symbol {
12    Symbol::qualified("stream/fabric", "RemoteError")
13}
14
15/// Returns the diagnostic kind marking a transport profile refused over frames.
16pub fn refused_profile_diagnostic_kind() -> Symbol {
17    Symbol::qualified("stream/fabric", "RefusedProfile")
18}
19
20/// Returns the diagnostic kind marking a stream truncated by a fabric limit.
21pub fn stream_limit_diagnostic_kind() -> Symbol {
22    Symbol::qualified("stream/fabric", "LimitExceeded")
23}
24
25/// Builds a chunk event whose payload is the interned data packet `(kind expr)`.
26///
27/// Used to inject an arbitrary `Expr` into a run's event stream at sequence
28/// `seq` under the realize event surface.
29pub fn expr_chunk_event(
30    cx: &mut sim_kernel::Cx,
31    run: Ref,
32    seq: u64,
33    kind: Symbol,
34    expr: Expr,
35) -> Result<Event> {
36    let packet = StreamPacket::data(kind, expr);
37    Event::new(
38        run,
39        seq,
40        Vec::new(),
41        EventKind::Chunk {
42            payload: packet.intern_ref(cx)?,
43        },
44    )
45}
46
47/// Folds a buffer of run events into a pull-based [`StreamValue`].
48///
49/// Chunk events become stream items, diagnostic events become diagnostic
50/// packets, and a failure or `Done` event ends the stream. Non-stream event
51/// kinds (started, claim, trace, effect, capture, card, final) are ignored.
52pub fn event_buffer_to_stream(
53    cx: &mut sim_kernel::Cx,
54    metadata: StreamMetadata,
55    events: impl IntoIterator<Item = Event>,
56) -> Result<StreamValue> {
57    let mut items = Vec::new();
58    for event in events {
59        match event.kind {
60            EventKind::Chunk { payload } => {
61                let packet = packet_from_ref(cx, &payload)?;
62                items.push(StreamItem::with_ticks(packet, event.ticks)?);
63            }
64            EventKind::Diagnostic(diagnostic) => {
65                items.push(StreamItem::new(StreamPacket::Diagnostic(
66                    diagnostic_packet(diagnostic),
67                )));
68            }
69            EventKind::Failed(error) => {
70                items.push(StreamItem::new(remote_error_packet(error_message(
71                    cx, &error,
72                ))));
73                break;
74            }
75            EventKind::Done => break,
76            EventKind::Started { .. }
77            | EventKind::Claim { .. }
78            | EventKind::Trace(_)
79            | EventKind::EffectRequested { .. }
80            | EventKind::EffectResolved { .. }
81            | EventKind::Capture { .. }
82            | EventKind::Card { .. }
83            | EventKind::Final(_) => {}
84        }
85    }
86    Ok(StreamValue::pull(metadata, items))
87}
88
89pub(crate) fn packet_from_ref(cx: &mut sim_kernel::Cx, reference: &Ref) -> Result<StreamPacket> {
90    let value = match value_from_ref(cx, reference) {
91        Ok(value) => value,
92        Err(error) => {
93            return Ok(chunk_payload_error_packet(format!(
94                "stream chunk payload ref could not be loaded: {error}"
95            )));
96        }
97    };
98    let expr = match value.object().as_expr(cx) {
99        Ok(expr) => expr,
100        Err(error) => {
101            return Ok(chunk_payload_error_packet(format!(
102                "stream chunk payload could not be represented as Expr: {error}"
103            )));
104        }
105    };
106    Ok(packet_from_expr(expr))
107}
108
109pub(crate) fn packet_and_ticks_from_remote_expr(expr: Expr) -> (StreamPacket, Vec<Tick>) {
110    match StreamEnvelope::try_from(expr.clone()) {
111        Ok(envelope) => match remote_profile_refusal(&envelope) {
112            Some(message) => (
113                diagnostic_stream_packet(refused_profile_diagnostic_kind(), message),
114                Vec::new(),
115            ),
116            None => (envelope.packet().clone(), envelope.ticks().to_vec()),
117        },
118        Err(_) => (packet_from_expr(expr), Vec::new()),
119    }
120}
121
122pub(crate) fn packet_from_expr(expr: Expr) -> StreamPacket {
123    match StreamPacket::try_from(expr.clone()) {
124        Ok(packet) => packet,
125        Err(packet_error) => match Datum::try_from(expr.clone()) {
126            Ok(_) => StreamPacket::data(Symbol::qualified("stream/data", "expr"), expr),
127            Err(datum_error) => chunk_payload_error_packet(format!(
128                "stream chunk payload is neither a stream packet nor datum Expr: packet parse failed: {packet_error}; datum conversion failed: {datum_error}"
129            )),
130        },
131    }
132}
133
134pub(crate) fn diagnostic_stream_packet(kind: Symbol, message: impl Into<String>) -> StreamPacket {
135    StreamPacket::Diagnostic(StreamDiagnostic::new(kind, message.into()))
136}
137
138pub(crate) fn remote_error_packet(message: impl Into<String>) -> StreamPacket {
139    diagnostic_stream_packet(remote_error_diagnostic_kind(), message)
140}
141
142fn chunk_payload_error_packet(message: impl Into<String>) -> StreamPacket {
143    diagnostic_stream_packet(
144        Symbol::qualified("stream/fabric", "ChunkPayloadError"),
145        message.into(),
146    )
147}
148
149pub(crate) fn error_message(cx: &mut sim_kernel::Cx, reference: &Ref) -> String {
150    match value_from_ref(cx, reference) {
151        Ok(value) => value
152            .object()
153            .display(cx)
154            .unwrap_or_else(|err| err.to_string()),
155        Err(err) => err.to_string(),
156    }
157}
158
159fn diagnostic_packet(diagnostic: Diagnostic) -> StreamDiagnostic {
160    let kind = diagnostic
161        .code
162        .unwrap_or_else(|| Symbol::qualified("stream/fabric", "Diagnostic"));
163    let prefix = match diagnostic.severity {
164        Severity::Error => "error",
165        Severity::Warning => "warning",
166        Severity::Info => "info",
167        Severity::Note => "note",
168    };
169    StreamDiagnostic::new(kind, format!("{prefix}: {}", diagnostic.message))
170}
171
172pub(crate) fn packet_ref(cx: &mut sim_kernel::Cx, packet: &StreamPacket) -> Result<Ref> {
173    packet.intern_ref(cx)
174}
175
176pub(crate) fn metadata_from_expr(expr: &Expr) -> Result<StreamMetadata> {
177    StreamMetadata::from_table_expr(expr).map_err(|err| {
178        Error::Eval(format!(
179            "remote stream start frame did not contain stream metadata: {err}"
180        ))
181    })
182}
183
184fn remote_profile_refusal(envelope: &StreamEnvelope) -> Option<String> {
185    let profile = envelope.profile();
186    if profile.name().as_qualified_str() == "stream/profile/realtime-local-audio"
187        || profile.has_capability(StreamCapability::Realtime)
188            && !profile.has_capability(StreamCapability::Preview)
189    {
190        return Some(format!(
191            "stream profile {} is refused over remote stream fabric frames; convert to stream/profile/lan-buffered-audio-preview chunks",
192            profile.name().as_qualified_str()
193        ));
194    }
195    None
196}