1use sim_kernel::{Error, Expr, Result, Symbol};
12
13#[path = "cassette/redaction.rs"]
14mod redaction;
15#[path = "cassette/stats.rs"]
16mod stats;
17
18use crate::buffer::{expr_kind, field, string_field, symbol_field};
19use crate::{
20 StreamCapability, StreamEnvelope, StreamItem, StreamMetadata, StreamPacket, StreamStats,
21 StreamValue, TransportProfile,
22};
23
24use redaction::{
25 envelope_has_host_device, is_host_device_symbol, metadata_has_host_device,
26 packet_has_private_payload, redact_envelope, redact_metadata, redact_symbol,
27};
28use stats::{stream_stats_expr, stream_stats_from_expr};
29
30pub const STREAM_CASSETTE_FIXTURE_ROOT: &str = "fixtures/streams/golden";
32pub const STREAM_CASSETTE_EXTENSION: &str = "simcassette";
34
35#[derive(Clone, Debug, PartialEq, Eq)]
41pub struct StreamCassetteTiming {
42 pub clock: Symbol,
44 pub packet_count: usize,
46 pub first_sequence: Option<u64>,
48 pub last_sequence: Option<u64>,
50 pub finite: bool,
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct StreamCassette {
63 metadata: StreamMetadata,
64 envelopes: Vec<StreamEnvelope>,
65 timing: StreamCassetteTiming,
66 diagnostics: Vec<Symbol>,
67 final_stats: StreamStats,
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
76pub struct StreamGoldenFixtureReport {
77 pub path: String,
79 pub format: Symbol,
81 pub packet_count: usize,
83 pub final_stats: StreamStats,
85}
86
87impl StreamCassette {
88 pub fn from_stream_value(stream: &StreamValue, profile: TransportProfile) -> Result<Self> {
94 let mut items = Vec::new();
95 while let Some(item) = stream.next_packet()? {
96 items.push(item);
97 }
98 let final_stats = stream.stats()?;
99 Self::from_items(stream.metadata().clone(), items, profile, final_stats)
100 }
101
102 pub fn from_items(
108 metadata: StreamMetadata,
109 items: Vec<StreamItem>,
110 profile: TransportProfile,
111 final_stats: StreamStats,
112 ) -> Result<Self> {
113 let envelopes = items
114 .iter()
115 .enumerate()
116 .map(|(sequence, item)| {
117 StreamEnvelope::from_item_with_profile(
118 &metadata,
119 sequence as u64,
120 item,
121 profile.clone(),
122 )
123 })
124 .collect::<Result<Vec<_>>>()?;
125 Self::from_envelopes(metadata, envelopes, final_stats)
126 }
127
128 pub fn from_envelopes(
134 metadata: StreamMetadata,
135 envelopes: Vec<StreamEnvelope>,
136 final_stats: StreamStats,
137 ) -> Result<Self> {
138 let timing = timing_from_envelopes(&metadata, &envelopes);
139 let diagnostics = diagnostics_from_envelopes(&envelopes);
140 Ok(Self {
141 metadata,
142 envelopes,
143 timing,
144 diagnostics,
145 final_stats,
146 })
147 }
148
149 pub fn metadata(&self) -> &StreamMetadata {
151 &self.metadata
152 }
153
154 pub fn envelopes(&self) -> &[StreamEnvelope] {
156 &self.envelopes
157 }
158
159 pub fn timing(&self) -> &StreamCassetteTiming {
161 &self.timing
162 }
163
164 pub fn diagnostics(&self) -> &[Symbol] {
166 &self.diagnostics
167 }
168
169 pub fn final_stats(&self) -> &StreamStats {
171 &self.final_stats
172 }
173
174 pub fn items(&self) -> Result<Vec<StreamItem>> {
179 self.envelopes
180 .iter()
181 .map(|envelope| {
182 StreamItem::with_ticks(envelope.packet().clone(), envelope.ticks().to_vec())
183 })
184 .collect()
185 }
186
187 pub fn replay_stream_value(&self) -> Result<StreamValue> {
189 Ok(StreamValue::pull(self.metadata.clone(), self.items()?))
190 }
191
192 pub fn to_expr(&self) -> Expr {
198 Expr::Map(vec![
199 (
200 Expr::Symbol(Symbol::new("cassette")),
201 Expr::Symbol(stream_cassette_format_symbol()),
202 ),
203 (
204 Expr::Symbol(Symbol::new("metadata")),
205 self.metadata.table_expr(),
206 ),
207 (Expr::Symbol(Symbol::new("timing")), self.timing.to_expr()),
208 (
209 Expr::Symbol(Symbol::new("envelopes")),
210 Expr::List(self.envelopes.iter().map(StreamEnvelope::to_expr).collect()),
211 ),
212 (
213 Expr::Symbol(Symbol::new("diagnostics")),
214 Expr::List(self.diagnostics.iter().cloned().map(Expr::Symbol).collect()),
215 ),
216 (
217 Expr::Symbol(Symbol::new("final-stats")),
218 stream_stats_expr(&self.final_stats),
219 ),
220 ])
221 }
222
223 pub fn from_expr(expr: &Expr) -> Result<Self> {
230 let Expr::Map(entries) = expr else {
231 return Err(Error::TypeMismatch {
232 expected: "stream cassette map",
233 found: expr_kind(expr),
234 });
235 };
236 ensure_fields(
237 entries,
238 &[
239 "cassette",
240 "metadata",
241 "timing",
242 "envelopes",
243 "diagnostics",
244 "final-stats",
245 ],
246 )?;
247 let format = symbol_field(entries, "cassette")?;
248 if *format != stream_cassette_format_symbol() {
249 return Err(Error::Eval(format!(
250 "unknown stream cassette format {}",
251 format.as_qualified_str()
252 )));
253 }
254 let metadata = StreamMetadata::from_table_expr(field(entries, "metadata")?)?;
255 let envelopes = list_field(entries, "envelopes")?
256 .iter()
257 .map(|expr| StreamEnvelope::try_from(expr.clone()))
258 .collect::<Result<Vec<_>>>()?;
259 let metadata = restore_metadata_id(metadata, &envelopes);
260 let timing = StreamCassetteTiming::from_expr(field(entries, "timing")?)?;
261 let diagnostics = symbol_list(entries, "diagnostics")?;
262 let final_stats = stream_stats_from_expr(field(entries, "final-stats")?)?;
263 Ok(Self {
264 metadata,
265 envelopes,
266 timing,
267 diagnostics,
268 final_stats,
269 })
270 }
271
272 pub fn redacted(&self) -> Result<Self> {
277 let metadata = redact_metadata(&self.metadata);
278 let envelopes = self
279 .envelopes
280 .iter()
281 .map(redact_envelope)
282 .collect::<Result<Vec<_>>>()?;
283 let mut redacted = Self::from_envelopes(metadata, envelopes, self.final_stats.clone())?;
284 redacted.diagnostics = self.diagnostics.iter().map(redact_symbol).collect();
285 Ok(redacted)
286 }
287
288 pub fn validate_golden_fixture(&self, path: &str) -> Result<StreamGoldenFixtureReport> {
297 validate_fixture_path(path)?;
298 if !self.timing.finite {
299 return Err(Error::Eval(
300 "golden stream fixture must be finite".to_owned(),
301 ));
302 }
303 for (index, envelope) in self.envelopes.iter().enumerate() {
304 if envelope.sequence() != index as u64 {
305 return Err(Error::Eval(format!(
306 "golden stream fixture sequence {} is not packet index {index}",
307 envelope.sequence()
308 )));
309 }
310 if !envelope
311 .profile()
312 .has_capability(StreamCapability::Replayable)
313 && !envelope.profile().has_capability(StreamCapability::Preview)
314 {
315 return Err(Error::Eval(format!(
316 "golden stream fixture profile {} is not replayable or previewable",
317 envelope.profile().name()
318 )));
319 }
320 if envelope
321 .profile()
322 .has_capability(StreamCapability::Realtime)
323 {
324 return Err(Error::Eval(
325 "golden stream fixture cannot require realtime transport".to_owned(),
326 ));
327 }
328 if packet_has_private_payload(envelope.packet()) || envelope_has_host_device(envelope) {
329 return Err(Error::Eval(
330 "golden stream fixture contains unredacted payload".to_owned(),
331 ));
332 }
333 }
334 if metadata_has_host_device(&self.metadata)
335 || is_host_device_symbol(&self.timing.clock)
336 || self.diagnostics.iter().any(is_host_device_symbol)
337 {
338 return Err(Error::Eval(
339 "golden stream fixture contains an unredacted host device name".to_owned(),
340 ));
341 }
342 Ok(StreamGoldenFixtureReport {
343 path: path.to_owned(),
344 format: stream_cassette_format_symbol(),
345 packet_count: self.envelopes.len(),
346 final_stats: self.final_stats.clone(),
347 })
348 }
349}
350
351impl StreamCassetteTiming {
352 pub fn to_expr(&self) -> Expr {
354 Expr::Map(vec![
355 (
356 Expr::Symbol(Symbol::new("clock")),
357 Expr::Symbol(self.clock.clone()),
358 ),
359 (
360 Expr::Symbol(Symbol::new("packet-count")),
361 Expr::String(self.packet_count.to_string()),
362 ),
363 (
364 Expr::Symbol(Symbol::new("first-sequence")),
365 optional_u64_expr(self.first_sequence),
366 ),
367 (
368 Expr::Symbol(Symbol::new("last-sequence")),
369 optional_u64_expr(self.last_sequence),
370 ),
371 (Expr::Symbol(Symbol::new("finite")), Expr::Bool(self.finite)),
372 ])
373 }
374
375 pub fn from_expr(expr: &Expr) -> Result<Self> {
381 let Expr::Map(entries) = expr else {
382 return Err(Error::TypeMismatch {
383 expected: "stream cassette timing map",
384 found: expr_kind(expr),
385 });
386 };
387 ensure_fields(
388 entries,
389 &[
390 "clock",
391 "packet-count",
392 "first-sequence",
393 "last-sequence",
394 "finite",
395 ],
396 )?;
397 Ok(Self {
398 clock: symbol_field(entries, "clock")?.clone(),
399 packet_count: parse_usize(entries, "packet-count")?,
400 first_sequence: optional_u64(field(entries, "first-sequence")?)?,
401 last_sequence: optional_u64(field(entries, "last-sequence")?)?,
402 finite: bool_field(entries, "finite")?,
403 })
404 }
405}
406
407pub fn stream_cassette_format_symbol() -> Symbol {
409 Symbol::qualified("stream/cassette", "v1")
410}
411
412pub fn stream_cassette_golden_root() -> &'static str {
414 STREAM_CASSETTE_FIXTURE_ROOT
415}
416
417pub fn stream_cassette_golden_extension() -> &'static str {
419 STREAM_CASSETTE_EXTENSION
420}
421
422fn timing_from_envelopes(
423 metadata: &StreamMetadata,
424 envelopes: &[StreamEnvelope],
425) -> StreamCassetteTiming {
426 StreamCassetteTiming {
427 clock: metadata.clock().clone(),
428 packet_count: envelopes.len(),
429 first_sequence: envelopes.first().map(StreamEnvelope::sequence),
430 last_sequence: envelopes.last().map(StreamEnvelope::sequence),
431 finite: true,
432 }
433}
434
435fn restore_metadata_id(metadata: StreamMetadata, envelopes: &[StreamEnvelope]) -> StreamMetadata {
436 let Some(first) = envelopes.first() else {
437 return metadata;
438 };
439 if metadata.id().as_qualified_str() != first.stream_id().as_qualified_str() {
440 return metadata;
441 }
442 StreamMetadata::new(
443 first.stream_id().clone(),
444 metadata.media(),
445 metadata.direction(),
446 metadata.clock().clone(),
447 metadata.buffer().clone(),
448 )
449}
450
451fn diagnostics_from_envelopes(envelopes: &[StreamEnvelope]) -> Vec<Symbol> {
452 let mut diagnostics = Vec::new();
453 for envelope in envelopes {
454 for diagnostic in envelope.diagnostics() {
455 push_unique(&mut diagnostics, diagnostic.clone());
456 }
457 if let StreamPacket::Diagnostic(packet) = envelope.packet() {
458 push_unique(&mut diagnostics, packet.kind().clone());
459 }
460 }
461 diagnostics
462}
463
464fn push_unique(symbols: &mut Vec<Symbol>, symbol: Symbol) {
465 if !symbols.contains(&symbol) {
466 symbols.push(symbol);
467 }
468}
469
470fn validate_fixture_path(path: &str) -> Result<()> {
471 let Some(relative) = path.strip_prefix(STREAM_CASSETTE_FIXTURE_ROOT) else {
472 return Err(Error::Eval(format!(
473 "golden stream fixture path must live under {STREAM_CASSETTE_FIXTURE_ROOT}"
474 )));
475 };
476 if !relative.starts_with('/') || relative == "/" {
477 return Err(Error::Eval(format!(
478 "golden stream fixture path must live under {STREAM_CASSETTE_FIXTURE_ROOT}"
479 )));
480 }
481 let expected_extension = format!(".{STREAM_CASSETTE_EXTENSION}");
482 if !path.ends_with(&expected_extension) {
483 return Err(Error::Eval(format!(
484 "golden stream fixture path must end in .{STREAM_CASSETTE_EXTENSION}"
485 )));
486 }
487 Ok(())
488}
489
490fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
491 for (key, _) in entries {
492 let Expr::Symbol(symbol) = key else {
493 return Err(Error::TypeMismatch {
494 expected: "symbol stream cassette field",
495 found: expr_kind(key),
496 });
497 };
498 if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
499 continue;
500 }
501 return Err(Error::Eval(format!(
502 "unknown stream cassette field {}",
503 symbol.as_qualified_str()
504 )));
505 }
506 Ok(())
507}
508
509fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
510 match field(entries, name)? {
511 Expr::List(items) => Ok(items),
512 other => Err(Error::TypeMismatch {
513 expected: "list field",
514 found: expr_kind(other),
515 }),
516 }
517}
518
519fn symbol_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Symbol>> {
520 list_field(entries, name)?
521 .iter()
522 .map(|expr| match expr {
523 Expr::Symbol(symbol) => Ok(symbol.clone()),
524 other => Err(Error::TypeMismatch {
525 expected: "symbol list item",
526 found: expr_kind(other),
527 }),
528 })
529 .collect()
530}
531
532fn parse_usize(entries: &[(Expr, Expr)], name: &str) -> Result<usize> {
533 string_field(entries, name)?
534 .parse::<usize>()
535 .map_err(|err| Error::Eval(format!("invalid stream cassette {name}: {err}")))
536}
537
538fn optional_u64(expr: &Expr) -> Result<Option<u64>> {
539 match expr {
540 Expr::Nil => Ok(None),
541 Expr::String(value) => value
542 .parse::<u64>()
543 .map(Some)
544 .map_err(|err| Error::Eval(format!("invalid stream cassette sequence: {err}"))),
545 other => Err(Error::TypeMismatch {
546 expected: "optional u64 string",
547 found: expr_kind(other),
548 }),
549 }
550}
551
552fn optional_u64_expr(value: Option<u64>) -> Expr {
553 value
554 .map(|value| Expr::String(value.to_string()))
555 .unwrap_or(Expr::Nil)
556}
557
558fn bool_field(entries: &[(Expr, Expr)], name: &str) -> Result<bool> {
559 match field(entries, name)? {
560 Expr::Bool(value) => Ok(*value),
561 other => Err(Error::TypeMismatch {
562 expected: "bool field",
563 found: expr_kind(other),
564 }),
565 }
566}