Skip to main content

sim_lib_stream_core/
cassette.rs

1//! Golden-fixture record and replay for streams.
2//!
3//! A [`StreamCassette`] captures a deterministic trace of a stream -- its
4//! metadata, the ordered envelopes it produced, derived timing, accumulated
5//! diagnostics, and the final stats -- so that the same trace can be replayed
6//! as a fresh [`StreamValue`] or persisted as a golden fixture for tests. The
7//! kernel supplies the protocol types ([`Expr`], [`Symbol`], [`Error`]) used to
8//! serialize a cassette; this module supplies the concrete streaming-fabric
9//! behavior that records, redacts, validates, and round-trips those traces.
10
11use sim_kernel::{Error, Expr, Result, Symbol};
12
13#[path = "cassette/redaction.rs"]
14mod redaction;
15#[path = "cassette/stats.rs"]
16mod stats;
17
18use crate::buffer::{expr_kind, field, string_field, symbol_field};
19use crate::{
20    StreamCapability, StreamEnvelope, StreamItem, StreamMetadata, StreamPacket, StreamStats,
21    StreamValue, TransportProfile,
22};
23
24use redaction::{
25    envelope_has_host_device, is_host_device_symbol, metadata_has_host_device,
26    packet_has_private_payload, redact_envelope, redact_metadata, redact_symbol,
27};
28use stats::{stream_stats_expr, stream_stats_from_expr};
29
30/// Repository-relative root directory under which golden stream fixtures live.
31pub const STREAM_CASSETTE_FIXTURE_ROOT: &str = "fixtures/streams/golden";
32/// File extension (without the leading dot) for a persisted golden fixture.
33pub const STREAM_CASSETTE_EXTENSION: &str = "simcassette";
34
35/// Derived timing summary for a recorded cassette.
36///
37/// Captures the clock the stream ran on, how many packets were recorded, the
38/// sequence range of the first and last envelopes, and whether the trace is
39/// finite (a golden fixture must be finite).
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub struct StreamCassetteTiming {
42    /// Clock-domain symbol the recorded stream advanced against.
43    pub clock: Symbol,
44    /// Number of envelopes captured in the cassette.
45    pub packet_count: usize,
46    /// Sequence number of the first envelope, or `None` when empty.
47    pub first_sequence: Option<u64>,
48    /// Sequence number of the last envelope, or `None` when empty.
49    pub last_sequence: Option<u64>,
50    /// Whether the recorded trace terminated; golden fixtures must be finite.
51    pub finite: bool,
52}
53
54/// A recorded, replayable trace of a single stream.
55///
56/// Holds the stream metadata, the ordered envelopes, derived [timing](StreamCassetteTiming),
57/// the deduplicated diagnostic symbols observed, and the final [`StreamStats`].
58/// A cassette can be rebuilt into a live [`StreamValue`] via
59/// [`replay_stream_value`](StreamCassette::replay_stream_value), serialized to
60/// an [`Expr`] map, and validated as a golden fixture.
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct StreamCassette {
63    metadata: StreamMetadata,
64    envelopes: Vec<StreamEnvelope>,
65    timing: StreamCassetteTiming,
66    diagnostics: Vec<Symbol>,
67    final_stats: StreamStats,
68}
69
70/// Outcome of validating a cassette against the golden-fixture rules.
71///
72/// Returned by [`StreamCassette::validate_golden_fixture`] once a cassette
73/// passes every fixture invariant; records where the fixture lives, its format
74/// symbol, the packet count, and the final stats.
75#[derive(Clone, Debug, PartialEq, Eq)]
76pub struct StreamGoldenFixtureReport {
77    /// Validated fixture path under [`STREAM_CASSETTE_FIXTURE_ROOT`].
78    pub path: String,
79    /// Cassette format symbol the fixture was written with.
80    pub format: Symbol,
81    /// Number of envelopes the fixture contains.
82    pub packet_count: usize,
83    /// Final accumulated stats captured at the end of the trace.
84    pub final_stats: StreamStats,
85}
86
87impl StreamCassette {
88    /// Records a cassette by draining every packet from a live stream.
89    ///
90    /// Pulls packets until the stream is exhausted, snapshots its final stats,
91    /// and builds the cassette from the metadata, drained items, and the given
92    /// [`TransportProfile`].
93    pub fn from_stream_value(stream: &StreamValue, profile: TransportProfile) -> Result<Self> {
94        let mut items = Vec::new();
95        while let Some(item) = stream.next_packet()? {
96            items.push(item);
97        }
98        let final_stats = stream.stats()?;
99        Self::from_items(stream.metadata().clone(), items, profile, final_stats)
100    }
101
102    /// Records a cassette from already-drained stream items.
103    ///
104    /// Wraps each item in a sequenced [`StreamEnvelope`] under the given
105    /// [`TransportProfile`], then delegates to
106    /// [`from_envelopes`](StreamCassette::from_envelopes).
107    pub fn from_items(
108        metadata: StreamMetadata,
109        items: Vec<StreamItem>,
110        profile: TransportProfile,
111        final_stats: StreamStats,
112    ) -> Result<Self> {
113        let envelopes = items
114            .iter()
115            .enumerate()
116            .map(|(sequence, item)| {
117                StreamEnvelope::from_item_with_profile(
118                    &metadata,
119                    sequence as u64,
120                    item,
121                    profile.clone(),
122                )
123            })
124            .collect::<Result<Vec<_>>>()?;
125        Self::from_envelopes(metadata, envelopes, final_stats)
126    }
127
128    /// Builds a cassette directly from sequenced envelopes.
129    ///
130    /// Derives the [timing](StreamCassetteTiming) and the deduplicated
131    /// diagnostic set from the envelopes, pairing them with the supplied
132    /// metadata and final stats.
133    pub fn from_envelopes(
134        metadata: StreamMetadata,
135        envelopes: Vec<StreamEnvelope>,
136        final_stats: StreamStats,
137    ) -> Result<Self> {
138        let timing = timing_from_envelopes(&metadata, &envelopes);
139        let diagnostics = diagnostics_from_envelopes(&envelopes);
140        Ok(Self {
141            metadata,
142            envelopes,
143            timing,
144            diagnostics,
145            final_stats,
146        })
147    }
148
149    /// Returns the metadata of the recorded stream.
150    pub fn metadata(&self) -> &StreamMetadata {
151        &self.metadata
152    }
153
154    /// Returns the recorded envelopes in sequence order.
155    pub fn envelopes(&self) -> &[StreamEnvelope] {
156        &self.envelopes
157    }
158
159    /// Returns the derived timing summary for the trace.
160    pub fn timing(&self) -> &StreamCassetteTiming {
161        &self.timing
162    }
163
164    /// Returns the deduplicated diagnostic symbols observed during recording.
165    pub fn diagnostics(&self) -> &[Symbol] {
166        &self.diagnostics
167    }
168
169    /// Returns the final accumulated stats captured at end of trace.
170    pub fn final_stats(&self) -> &StreamStats {
171        &self.final_stats
172    }
173
174    /// Reconstructs the stream items from the recorded envelopes.
175    ///
176    /// Each item pairs an envelope's packet with its captured ticks, ready to
177    /// feed a replay stream.
178    pub fn items(&self) -> Result<Vec<StreamItem>> {
179        self.envelopes
180            .iter()
181            .map(|envelope| {
182                StreamItem::with_ticks(envelope.packet().clone(), envelope.ticks().to_vec())
183            })
184            .collect()
185    }
186
187    /// Rebuilds a live, pull-based [`StreamValue`] from the recorded trace.
188    pub fn replay_stream_value(&self) -> Result<StreamValue> {
189        Ok(StreamValue::pull(self.metadata.clone(), self.items()?))
190    }
191
192    /// Serializes the cassette to an [`Expr`] map keyed by field symbol.
193    ///
194    /// The map carries the format symbol, metadata table, timing, envelope
195    /// list, diagnostics, and final stats, suitable for persistence and
196    /// round-tripping through [`from_expr`](StreamCassette::from_expr).
197    pub fn to_expr(&self) -> Expr {
198        Expr::Map(vec![
199            (
200                Expr::Symbol(Symbol::new("cassette")),
201                Expr::Symbol(stream_cassette_format_symbol()),
202            ),
203            (
204                Expr::Symbol(Symbol::new("metadata")),
205                self.metadata.table_expr(),
206            ),
207            (Expr::Symbol(Symbol::new("timing")), self.timing.to_expr()),
208            (
209                Expr::Symbol(Symbol::new("envelopes")),
210                Expr::List(self.envelopes.iter().map(StreamEnvelope::to_expr).collect()),
211            ),
212            (
213                Expr::Symbol(Symbol::new("diagnostics")),
214                Expr::List(self.diagnostics.iter().cloned().map(Expr::Symbol).collect()),
215            ),
216            (
217                Expr::Symbol(Symbol::new("final-stats")),
218                stream_stats_expr(&self.final_stats),
219            ),
220        ])
221    }
222
223    /// Deserializes a cassette from an [`Expr`] map produced by
224    /// [`to_expr`](StreamCassette::to_expr).
225    ///
226    /// Validates the field set and format symbol, then reconstructs metadata,
227    /// envelopes, timing, diagnostics, and final stats. Fails closed on an
228    /// unknown format, missing or unexpected fields, or type mismatches.
229    pub fn from_expr(expr: &Expr) -> Result<Self> {
230        let Expr::Map(entries) = expr else {
231            return Err(Error::TypeMismatch {
232                expected: "stream cassette map",
233                found: expr_kind(expr),
234            });
235        };
236        ensure_fields(
237            entries,
238            &[
239                "cassette",
240                "metadata",
241                "timing",
242                "envelopes",
243                "diagnostics",
244                "final-stats",
245            ],
246        )?;
247        let format = symbol_field(entries, "cassette")?;
248        if *format != stream_cassette_format_symbol() {
249            return Err(Error::Eval(format!(
250                "unknown stream cassette format {}",
251                format.as_qualified_str()
252            )));
253        }
254        let metadata = StreamMetadata::from_table_expr(field(entries, "metadata")?)?;
255        let envelopes = list_field(entries, "envelopes")?
256            .iter()
257            .map(|expr| StreamEnvelope::try_from(expr.clone()))
258            .collect::<Result<Vec<_>>>()?;
259        let metadata = restore_metadata_id(metadata, &envelopes);
260        let timing = StreamCassetteTiming::from_expr(field(entries, "timing")?)?;
261        let diagnostics = symbol_list(entries, "diagnostics")?;
262        let final_stats = stream_stats_from_expr(field(entries, "final-stats")?)?;
263        Ok(Self {
264            metadata,
265            envelopes,
266            timing,
267            diagnostics,
268            final_stats,
269        })
270    }
271
272    /// Returns a copy with host-device names and private payloads redacted.
273    ///
274    /// Redacts the metadata, every envelope, and the diagnostic symbols so the
275    /// result is safe to persist as a golden fixture.
276    pub fn redacted(&self) -> Result<Self> {
277        let metadata = redact_metadata(&self.metadata);
278        let envelopes = self
279            .envelopes
280            .iter()
281            .map(redact_envelope)
282            .collect::<Result<Vec<_>>>()?;
283        let mut redacted = Self::from_envelopes(metadata, envelopes, self.final_stats.clone())?;
284        redacted.diagnostics = self.diagnostics.iter().map(redact_symbol).collect();
285        Ok(redacted)
286    }
287
288    /// Validates the cassette as a golden fixture at `path`.
289    ///
290    /// Checks the path lives under [`STREAM_CASSETTE_FIXTURE_ROOT`] with the
291    /// cassette extension, that the trace is finite, that envelope sequences
292    /// match their packet index, that each transport profile is replayable or
293    /// previewable but never realtime, and that no unredacted payload or
294    /// host-device name remains. Returns a [`StreamGoldenFixtureReport`] on
295    /// success, or an error describing the first failed invariant.
296    pub fn validate_golden_fixture(&self, path: &str) -> Result<StreamGoldenFixtureReport> {
297        validate_fixture_path(path)?;
298        if !self.timing.finite {
299            return Err(Error::Eval(
300                "golden stream fixture must be finite".to_owned(),
301            ));
302        }
303        for (index, envelope) in self.envelopes.iter().enumerate() {
304            if envelope.sequence() != index as u64 {
305                return Err(Error::Eval(format!(
306                    "golden stream fixture sequence {} is not packet index {index}",
307                    envelope.sequence()
308                )));
309            }
310            if !envelope
311                .profile()
312                .has_capability(StreamCapability::Replayable)
313                && !envelope.profile().has_capability(StreamCapability::Preview)
314            {
315                return Err(Error::Eval(format!(
316                    "golden stream fixture profile {} is not replayable or previewable",
317                    envelope.profile().name()
318                )));
319            }
320            if envelope
321                .profile()
322                .has_capability(StreamCapability::Realtime)
323            {
324                return Err(Error::Eval(
325                    "golden stream fixture cannot require realtime transport".to_owned(),
326                ));
327            }
328            if packet_has_private_payload(envelope.packet()) || envelope_has_host_device(envelope) {
329                return Err(Error::Eval(
330                    "golden stream fixture contains unredacted payload".to_owned(),
331                ));
332            }
333        }
334        if metadata_has_host_device(&self.metadata)
335            || is_host_device_symbol(&self.timing.clock)
336            || self.diagnostics.iter().any(is_host_device_symbol)
337        {
338            return Err(Error::Eval(
339                "golden stream fixture contains an unredacted host device name".to_owned(),
340            ));
341        }
342        Ok(StreamGoldenFixtureReport {
343            path: path.to_owned(),
344            format: stream_cassette_format_symbol(),
345            packet_count: self.envelopes.len(),
346            final_stats: self.final_stats.clone(),
347        })
348    }
349}
350
351impl StreamCassetteTiming {
352    /// Serializes the timing summary to an [`Expr`] map keyed by field symbol.
353    pub fn to_expr(&self) -> Expr {
354        Expr::Map(vec![
355            (
356                Expr::Symbol(Symbol::new("clock")),
357                Expr::Symbol(self.clock.clone()),
358            ),
359            (
360                Expr::Symbol(Symbol::new("packet-count")),
361                Expr::String(self.packet_count.to_string()),
362            ),
363            (
364                Expr::Symbol(Symbol::new("first-sequence")),
365                optional_u64_expr(self.first_sequence),
366            ),
367            (
368                Expr::Symbol(Symbol::new("last-sequence")),
369                optional_u64_expr(self.last_sequence),
370            ),
371            (Expr::Symbol(Symbol::new("finite")), Expr::Bool(self.finite)),
372        ])
373    }
374
375    /// Deserializes a timing summary from an [`Expr`] map produced by
376    /// [`to_expr`](StreamCassetteTiming::to_expr).
377    ///
378    /// Validates the field set and fails closed on missing or unexpected
379    /// fields or type mismatches.
380    pub fn from_expr(expr: &Expr) -> Result<Self> {
381        let Expr::Map(entries) = expr else {
382            return Err(Error::TypeMismatch {
383                expected: "stream cassette timing map",
384                found: expr_kind(expr),
385            });
386        };
387        ensure_fields(
388            entries,
389            &[
390                "clock",
391                "packet-count",
392                "first-sequence",
393                "last-sequence",
394                "finite",
395            ],
396        )?;
397        Ok(Self {
398            clock: symbol_field(entries, "clock")?.clone(),
399            packet_count: parse_usize(entries, "packet-count")?,
400            first_sequence: optional_u64(field(entries, "first-sequence")?)?,
401            last_sequence: optional_u64(field(entries, "last-sequence")?)?,
402            finite: bool_field(entries, "finite")?,
403        })
404    }
405}
406
407/// Returns the format symbol stamped into every serialized cassette.
408pub fn stream_cassette_format_symbol() -> Symbol {
409    Symbol::qualified("stream/cassette", "v1")
410}
411
412/// Returns the repository-relative root for golden stream fixtures.
413pub fn stream_cassette_golden_root() -> &'static str {
414    STREAM_CASSETTE_FIXTURE_ROOT
415}
416
417/// Returns the file extension (without leading dot) for golden fixtures.
418pub fn stream_cassette_golden_extension() -> &'static str {
419    STREAM_CASSETTE_EXTENSION
420}
421
422fn timing_from_envelopes(
423    metadata: &StreamMetadata,
424    envelopes: &[StreamEnvelope],
425) -> StreamCassetteTiming {
426    StreamCassetteTiming {
427        clock: metadata.clock().clone(),
428        packet_count: envelopes.len(),
429        first_sequence: envelopes.first().map(StreamEnvelope::sequence),
430        last_sequence: envelopes.last().map(StreamEnvelope::sequence),
431        finite: true,
432    }
433}
434
435fn restore_metadata_id(metadata: StreamMetadata, envelopes: &[StreamEnvelope]) -> StreamMetadata {
436    let Some(first) = envelopes.first() else {
437        return metadata;
438    };
439    if metadata.id().as_qualified_str() != first.stream_id().as_qualified_str() {
440        return metadata;
441    }
442    StreamMetadata::new(
443        first.stream_id().clone(),
444        metadata.media(),
445        metadata.direction(),
446        metadata.clock().clone(),
447        metadata.buffer().clone(),
448    )
449}
450
451fn diagnostics_from_envelopes(envelopes: &[StreamEnvelope]) -> Vec<Symbol> {
452    let mut diagnostics = Vec::new();
453    for envelope in envelopes {
454        for diagnostic in envelope.diagnostics() {
455            push_unique(&mut diagnostics, diagnostic.clone());
456        }
457        if let StreamPacket::Diagnostic(packet) = envelope.packet() {
458            push_unique(&mut diagnostics, packet.kind().clone());
459        }
460    }
461    diagnostics
462}
463
464fn push_unique(symbols: &mut Vec<Symbol>, symbol: Symbol) {
465    if !symbols.contains(&symbol) {
466        symbols.push(symbol);
467    }
468}
469
470fn validate_fixture_path(path: &str) -> Result<()> {
471    let Some(relative) = path.strip_prefix(STREAM_CASSETTE_FIXTURE_ROOT) else {
472        return Err(Error::Eval(format!(
473            "golden stream fixture path must live under {STREAM_CASSETTE_FIXTURE_ROOT}"
474        )));
475    };
476    if !relative.starts_with('/') || relative == "/" {
477        return Err(Error::Eval(format!(
478            "golden stream fixture path must live under {STREAM_CASSETTE_FIXTURE_ROOT}"
479        )));
480    }
481    let expected_extension = format!(".{STREAM_CASSETTE_EXTENSION}");
482    if !path.ends_with(&expected_extension) {
483        return Err(Error::Eval(format!(
484            "golden stream fixture path must end in .{STREAM_CASSETTE_EXTENSION}"
485        )));
486    }
487    Ok(())
488}
489
490fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
491    for (key, _) in entries {
492        let Expr::Symbol(symbol) = key else {
493            return Err(Error::TypeMismatch {
494                expected: "symbol stream cassette field",
495                found: expr_kind(key),
496            });
497        };
498        if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
499            continue;
500        }
501        return Err(Error::Eval(format!(
502            "unknown stream cassette field {}",
503            symbol.as_qualified_str()
504        )));
505    }
506    Ok(())
507}
508
509fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
510    match field(entries, name)? {
511        Expr::List(items) => Ok(items),
512        other => Err(Error::TypeMismatch {
513            expected: "list field",
514            found: expr_kind(other),
515        }),
516    }
517}
518
519fn symbol_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Symbol>> {
520    list_field(entries, name)?
521        .iter()
522        .map(|expr| match expr {
523            Expr::Symbol(symbol) => Ok(symbol.clone()),
524            other => Err(Error::TypeMismatch {
525                expected: "symbol list item",
526                found: expr_kind(other),
527            }),
528        })
529        .collect()
530}
531
532fn parse_usize(entries: &[(Expr, Expr)], name: &str) -> Result<usize> {
533    string_field(entries, name)?
534        .parse::<usize>()
535        .map_err(|err| Error::Eval(format!("invalid stream cassette {name}: {err}")))
536}
537
538fn optional_u64(expr: &Expr) -> Result<Option<u64>> {
539    match expr {
540        Expr::Nil => Ok(None),
541        Expr::String(value) => value
542            .parse::<u64>()
543            .map(Some)
544            .map_err(|err| Error::Eval(format!("invalid stream cassette sequence: {err}"))),
545        other => Err(Error::TypeMismatch {
546            expected: "optional u64 string",
547            found: expr_kind(other),
548        }),
549    }
550}
551
552fn optional_u64_expr(value: Option<u64>) -> Expr {
553    value
554        .map(|value| Expr::String(value.to_string()))
555        .unwrap_or(Expr::Nil)
556}
557
558fn bool_field(entries: &[(Expr, Expr)], name: &str) -> Result<bool> {
559    match field(entries, name)? {
560        Expr::Bool(value) => Ok(*value),
561        other => Err(Error::TypeMismatch {
562            expected: "bool field",
563            found: expr_kind(other),
564        }),
565    }
566}