1use sim_kernel::{Error, Expr, Result, Symbol, Tick};
4
5use crate::{
6 BufferPolicy, ClockDomain, LatencyClass, StreamCapability, StreamCassette, StreamDirection,
7 StreamEnvelope, StreamFaultKind, StreamFaultPlan, StreamItem, StreamMedia, StreamMetadata,
8 StreamPacket, StreamStats, TransportProfile,
9};
10
11#[derive(Clone, Debug, PartialEq, Eq)]
13pub struct MediaDescriptor {
14 symbol: Symbol,
15 stream_media: StreamMedia,
16}
17
18impl MediaDescriptor {
19 pub fn named(name: impl AsRef<str>) -> Result<Self> {
21 let name = name.as_ref();
22 if let Some(kind) = name.strip_prefix("ide/event/") {
23 return dev_event_media(kind);
24 }
25 let symbol = match name {
26 "stream/media/pcm" => StreamMedia::Pcm.symbol(),
27 "stream/media/midi" => StreamMedia::Midi.symbol(),
28 "stream/media/diagnostic" => StreamMedia::Diagnostic.symbol(),
29 "stream/media/data" => StreamMedia::Data.symbol(),
30 other => {
31 return Err(Error::Eval(format!(
32 "unsupported stream media descriptor {other}"
33 )));
34 }
35 };
36 let stream_media = StreamMedia::from_symbol(&symbol)?;
37 Ok(Self {
38 symbol,
39 stream_media,
40 })
41 }
42
43 pub fn symbol(&self) -> &Symbol {
45 &self.symbol
46 }
47
48 pub fn stream_media(&self) -> StreamMedia {
50 self.stream_media
51 }
52}
53
54pub fn dev_event_media(kind: &str) -> Result<MediaDescriptor> {
56 validate_dev_event_kind(kind)?;
57 Ok(MediaDescriptor {
58 symbol: Symbol::qualified("ide/event", kind),
59 stream_media: StreamMedia::Data,
60 })
61}
62
63pub fn dev_event_metadata(stream_id: Symbol) -> Result<StreamMetadata> {
65 Ok(StreamMetadata::new(
66 stream_id,
67 StreamMedia::Data,
68 StreamDirection::Source,
69 ClockDomain::ServerFrame.symbol(),
70 BufferPolicy::bounded(128)?,
71 ))
72}
73
74#[derive(Clone, Debug, PartialEq, Eq)]
76pub struct DevEvent {
77 kind: String,
78 atelier_node: Symbol,
79 latency_class: LatencyClass,
80 payload: Expr,
81 ticks: Vec<Tick>,
82}
83
84impl DevEvent {
85 pub fn new(
87 kind: impl Into<String>,
88 atelier_node: Symbol,
89 latency_class: LatencyClass,
90 payload: Expr,
91 ) -> Result<Self> {
92 let kind = kind.into();
93 validate_dev_event_kind(&kind)?;
94 Ok(Self {
95 kind,
96 atelier_node,
97 latency_class,
98 payload,
99 ticks: Vec::new(),
100 })
101 }
102
103 pub fn edit(atelier_node: Symbol, payload: Expr) -> Result<Self> {
105 Self::new("edit", atelier_node, LatencyClass::Interactive, payload)
106 }
107
108 pub fn validate(atelier_node: Symbol, payload: Expr) -> Result<Self> {
110 Self::new(
111 "validate",
112 atelier_node,
113 LatencyClass::OfflineRender,
114 payload,
115 )
116 }
117
118 pub fn refusal(atelier_node: Symbol, payload: Expr) -> Result<Self> {
120 Self::new("refusal", atelier_node, LatencyClass::Interactive, payload)
121 }
122
123 pub fn with_ticks(mut self, ticks: Vec<Tick>) -> Result<Self> {
125 sim_kernel::validate_ticks(&ticks)?;
126 self.ticks = ticks;
127 Ok(self)
128 }
129
130 pub fn kind(&self) -> &str {
132 &self.kind
133 }
134
135 pub fn atelier_node(&self) -> &Symbol {
137 &self.atelier_node
138 }
139
140 pub fn latency_class(&self) -> LatencyClass {
142 self.latency_class
143 }
144
145 pub fn stream_item(&self) -> Result<StreamItem> {
147 StreamItem::with_ticks(
148 StreamPacket::data(
149 dev_event_media(&self.kind)?.symbol().clone(),
150 self.payload_expr(),
151 ),
152 self.ticks.clone(),
153 )
154 }
155
156 fn transport_profile(&self) -> Result<TransportProfile> {
157 TransportProfile::new(
158 Symbol::qualified(
159 "stream/profile",
160 format!("dev-{}", self.latency_class.wire_label()),
161 ),
162 self.latency_class,
163 vec![
164 StreamCapability::Deterministic,
165 StreamCapability::Bounded,
166 StreamCapability::Replayable,
167 ],
168 )
169 }
170
171 fn payload_expr(&self) -> Expr {
172 Expr::Map(vec![
173 (
174 Expr::Symbol(Symbol::new("event-kind")),
175 Expr::Symbol(Symbol::qualified("ide/event", self.kind.clone())),
176 ),
177 (
178 Expr::Symbol(Symbol::new("atelier-node")),
179 Expr::Symbol(self.atelier_node.clone()),
180 ),
181 (
182 Expr::Symbol(Symbol::new("latency-class")),
183 Expr::Symbol(self.latency_class.symbol()),
184 ),
185 (Expr::Symbol(Symbol::new("payload")), self.payload.clone()),
186 ])
187 }
188}
189
190#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct DevCassette {
193 cassette: StreamCassette,
194 content_hash: String,
195}
196
197impl DevCassette {
198 pub fn from_events(stream_id: Symbol, events: Vec<DevEvent>) -> Result<Self> {
200 let metadata = dev_event_metadata(stream_id)?;
201 let envelopes = events
202 .iter()
203 .enumerate()
204 .map(|(sequence, event)| {
205 StreamEnvelope::from_item_with_profile(
206 &metadata,
207 sequence as u64,
208 &event.stream_item()?,
209 event.transport_profile()?,
210 )
211 })
212 .collect::<Result<Vec<_>>>()?;
213 let final_stats = StreamStats {
214 yielded: envelopes.len() as u64,
215 closed: true,
216 ..StreamStats::default()
217 };
218 Self::from_stream_cassette(StreamCassette::from_envelopes(
219 metadata,
220 envelopes,
221 final_stats,
222 )?)
223 }
224
225 pub fn from_stream_cassette(cassette: StreamCassette) -> Result<Self> {
227 let content_hash = cassette_content_hash(&cassette);
228 Ok(Self {
229 cassette,
230 content_hash,
231 })
232 }
233
234 pub fn cassette(&self) -> &StreamCassette {
236 &self.cassette
237 }
238
239 pub fn content_hash(&self) -> &str {
241 &self.content_hash
242 }
243
244 pub fn redacted(&self) -> Result<Self> {
246 Self::from_stream_cassette(self.cassette.redacted()?)
247 }
248
249 pub fn validate_golden_fixture(&self, path: &str) -> Result<crate::StreamGoldenFixtureReport> {
251 self.cassette.validate_golden_fixture(path)
252 }
253
254 pub fn replay_content_hash(&self) -> Result<String> {
256 let items = self.cassette.items()?;
257 let metadata = self.cassette.metadata().clone();
258 let envelopes = items
259 .iter()
260 .enumerate()
261 .zip(self.cassette.envelopes())
262 .map(|((sequence, item), original)| {
263 StreamEnvelope::from_item_with_profile(
264 &metadata,
265 sequence as u64,
266 item,
267 original.profile().clone(),
268 )
269 })
270 .collect::<Result<Vec<_>>>()?;
271 let replay = StreamCassette::from_envelopes(
272 metadata,
273 envelopes,
274 self.cassette.final_stats().clone(),
275 )?;
276 Ok(cassette_content_hash(&replay))
277 }
278
279 pub fn replay_with_fault(&self, plan: &StreamFaultPlan) -> Result<DevFaultReport> {
281 let result = plan.apply(&self.cassette.items()?);
282 let mut diagnostics = result.diagnostics;
283 if diagnostics.contains(&StreamFaultKind::Drop.symbol()) {
284 push_unique(&mut diagnostics, dev_dropped_chunks_diagnostic());
285 }
286 Ok(DevFaultReport {
287 items: result.items,
288 diagnostics,
289 })
290 }
291}
292
293#[derive(Clone, Debug, PartialEq, Eq)]
295pub struct DevFaultReport {
296 pub items: Vec<StreamItem>,
298 pub diagnostics: Vec<Symbol>,
300}
301
302pub fn dev_dropped_chunks_diagnostic() -> Symbol {
304 Symbol::qualified("dev/diagnostic", "dropped-chunks")
305}
306
307fn validate_dev_event_kind(kind: &str) -> Result<()> {
308 let valid = !kind.is_empty()
309 && kind
310 .bytes()
311 .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || matches!(byte, b'-'));
312 if valid {
313 Ok(())
314 } else {
315 Err(Error::Eval(format!("invalid dev event kind {kind:?}")))
316 }
317}
318
319fn cassette_content_hash(cassette: &StreamCassette) -> String {
320 let key = cassette.to_expr().canonical_key();
321 let mut hash = 0xcbf29ce484222325u64;
322 hash_bytes(&mut hash, format!("{key:?}").as_bytes());
323 format!("fnv1a64:{hash:016x}")
324}
325
326fn hash_bytes(hash: &mut u64, bytes: &[u8]) {
327 for byte in bytes {
328 *hash ^= u64::from(*byte);
329 *hash = hash.wrapping_mul(0x100000001b3);
330 }
331}
332
333fn push_unique(symbols: &mut Vec<Symbol>, symbol: Symbol) {
334 if !symbols.contains(&symbol) {
335 symbols.push(symbol);
336 }
337}