sim_lib_stream_fabric/
events.rs1use sim_kernel::{
2 Datum, Diagnostic, Error, Event, EventKind, Expr, Ref, Result, Severity, Symbol, Tick,
3 value_from_ref,
4};
5use sim_lib_stream_core::{
6 StreamCapability, StreamDiagnostic, StreamEnvelope, StreamItem, StreamMetadata, StreamPacket,
7 StreamValue,
8};
9
10pub fn remote_error_diagnostic_kind() -> Symbol {
12 Symbol::qualified("stream/fabric", "RemoteError")
13}
14
15pub fn refused_profile_diagnostic_kind() -> Symbol {
17 Symbol::qualified("stream/fabric", "RefusedProfile")
18}
19
20pub fn stream_limit_diagnostic_kind() -> Symbol {
22 Symbol::qualified("stream/fabric", "LimitExceeded")
23}
24
25pub fn expr_chunk_event(
30 cx: &mut sim_kernel::Cx,
31 run: Ref,
32 seq: u64,
33 kind: Symbol,
34 expr: Expr,
35) -> Result<Event> {
36 let packet = StreamPacket::data(kind, expr);
37 Event::new(
38 run,
39 seq,
40 Vec::new(),
41 EventKind::Chunk {
42 payload: packet.intern_ref(cx)?,
43 },
44 )
45}
46
47pub fn event_buffer_to_stream(
53 cx: &mut sim_kernel::Cx,
54 metadata: StreamMetadata,
55 events: impl IntoIterator<Item = Event>,
56) -> Result<StreamValue> {
57 let mut items = Vec::new();
58 for event in events {
59 match event.kind {
60 EventKind::Chunk { payload } => {
61 let packet = packet_from_ref(cx, &payload)?;
62 items.push(StreamItem::with_ticks(packet, event.ticks)?);
63 }
64 EventKind::Diagnostic(diagnostic) => {
65 items.push(StreamItem::new(StreamPacket::Diagnostic(
66 diagnostic_packet(diagnostic),
67 )));
68 }
69 EventKind::Failed(error) => {
70 items.push(StreamItem::new(remote_error_packet(error_message(
71 cx, &error,
72 ))));
73 break;
74 }
75 EventKind::Done => break,
76 EventKind::Started { .. }
77 | EventKind::Claim { .. }
78 | EventKind::Trace(_)
79 | EventKind::EffectRequested { .. }
80 | EventKind::EffectResolved { .. }
81 | EventKind::Capture { .. }
82 | EventKind::Card { .. }
83 | EventKind::Final(_) => {}
84 }
85 }
86 Ok(StreamValue::pull(metadata, items))
87}
88
89pub(crate) fn packet_from_ref(cx: &mut sim_kernel::Cx, reference: &Ref) -> Result<StreamPacket> {
90 let value = match value_from_ref(cx, reference) {
91 Ok(value) => value,
92 Err(error) => {
93 return Ok(chunk_payload_error_packet(format!(
94 "stream chunk payload ref could not be loaded: {error}"
95 )));
96 }
97 };
98 let expr = match value.object().as_expr(cx) {
99 Ok(expr) => expr,
100 Err(error) => {
101 return Ok(chunk_payload_error_packet(format!(
102 "stream chunk payload could not be represented as Expr: {error}"
103 )));
104 }
105 };
106 Ok(packet_from_expr(expr))
107}
108
109pub(crate) fn packet_and_ticks_from_remote_expr(expr: Expr) -> (StreamPacket, Vec<Tick>) {
110 match StreamEnvelope::try_from(expr.clone()) {
111 Ok(envelope) => match remote_profile_refusal(&envelope) {
112 Some(message) => (
113 diagnostic_stream_packet(refused_profile_diagnostic_kind(), message),
114 Vec::new(),
115 ),
116 None => (envelope.packet().clone(), envelope.ticks().to_vec()),
117 },
118 Err(_) => (packet_from_expr(expr), Vec::new()),
119 }
120}
121
122pub(crate) fn packet_from_expr(expr: Expr) -> StreamPacket {
123 match StreamPacket::try_from(expr.clone()) {
124 Ok(packet) => packet,
125 Err(packet_error) => match Datum::try_from(expr.clone()) {
126 Ok(_) => StreamPacket::data(Symbol::qualified("stream/data", "expr"), expr),
127 Err(datum_error) => chunk_payload_error_packet(format!(
128 "stream chunk payload is neither a stream packet nor datum Expr: packet parse failed: {packet_error}; datum conversion failed: {datum_error}"
129 )),
130 },
131 }
132}
133
134pub(crate) fn diagnostic_stream_packet(kind: Symbol, message: impl Into<String>) -> StreamPacket {
135 StreamPacket::Diagnostic(StreamDiagnostic::new(kind, message.into()))
136}
137
138pub(crate) fn remote_error_packet(message: impl Into<String>) -> StreamPacket {
139 diagnostic_stream_packet(remote_error_diagnostic_kind(), message)
140}
141
142fn chunk_payload_error_packet(message: impl Into<String>) -> StreamPacket {
143 diagnostic_stream_packet(
144 Symbol::qualified("stream/fabric", "ChunkPayloadError"),
145 message.into(),
146 )
147}
148
149pub(crate) fn error_message(cx: &mut sim_kernel::Cx, reference: &Ref) -> String {
150 match value_from_ref(cx, reference) {
151 Ok(value) => value
152 .object()
153 .display(cx)
154 .unwrap_or_else(|err| err.to_string()),
155 Err(err) => err.to_string(),
156 }
157}
158
159fn diagnostic_packet(diagnostic: Diagnostic) -> StreamDiagnostic {
160 let kind = diagnostic
161 .code
162 .unwrap_or_else(|| Symbol::qualified("stream/fabric", "Diagnostic"));
163 let prefix = match diagnostic.severity {
164 Severity::Error => "error",
165 Severity::Warning => "warning",
166 Severity::Info => "info",
167 Severity::Note => "note",
168 };
169 StreamDiagnostic::new(kind, format!("{prefix}: {}", diagnostic.message))
170}
171
172pub(crate) fn packet_ref(cx: &mut sim_kernel::Cx, packet: &StreamPacket) -> Result<Ref> {
173 packet.intern_ref(cx)
174}
175
176pub(crate) fn metadata_from_expr(expr: &Expr) -> Result<StreamMetadata> {
177 StreamMetadata::from_table_expr(expr).map_err(|err| {
178 Error::Eval(format!(
179 "remote stream start frame did not contain stream metadata: {err}"
180 ))
181 })
182}
183
184fn remote_profile_refusal(envelope: &StreamEnvelope) -> Option<String> {
185 let profile = envelope.profile();
186 if profile.name().as_qualified_str() == "stream/profile/realtime-local-audio"
187 || profile.has_capability(StreamCapability::Realtime)
188 && !profile.has_capability(StreamCapability::Preview)
189 {
190 return Some(format!(
191 "stream profile {} is refused over remote stream fabric frames; convert to stream/profile/lan-buffered-audio-preview chunks",
192 profile.name().as_qualified_str()
193 ));
194 }
195 None
196}