use sim_kernel::{Error, Expr, Result, Symbol};
#[path = "cassette/redaction.rs"]
mod redaction;
#[path = "cassette/stats.rs"]
mod stats;
use crate::buffer::{expr_kind, field, string_field, symbol_field};
use crate::{
StreamCapability, StreamEnvelope, StreamItem, StreamMetadata, StreamPacket, StreamStats,
StreamValue, TransportProfile,
};
use redaction::{
envelope_has_host_device, is_host_device_symbol, metadata_has_host_device,
packet_has_private_payload, redact_envelope, redact_metadata, redact_symbol,
};
use stats::{stream_stats_expr, stream_stats_from_expr};
pub const STREAM_CASSETTE_FIXTURE_ROOT: &str = "fixtures/streams/golden";
pub const STREAM_CASSETTE_EXTENSION: &str = "simcassette";
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamCassetteTiming {
pub clock: Symbol,
pub packet_count: usize,
pub first_sequence: Option<u64>,
pub last_sequence: Option<u64>,
pub finite: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamCassette {
metadata: StreamMetadata,
envelopes: Vec<StreamEnvelope>,
timing: StreamCassetteTiming,
diagnostics: Vec<Symbol>,
final_stats: StreamStats,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamGoldenFixtureReport {
pub path: String,
pub format: Symbol,
pub packet_count: usize,
pub final_stats: StreamStats,
}
impl StreamCassette {
pub fn from_stream_value(stream: &StreamValue, profile: TransportProfile) -> Result<Self> {
let mut items = Vec::new();
while let Some(item) = stream.next_packet()? {
items.push(item);
}
let final_stats = stream.stats()?;
Self::from_items(stream.metadata().clone(), items, profile, final_stats)
}
pub fn from_items(
metadata: StreamMetadata,
items: Vec<StreamItem>,
profile: TransportProfile,
final_stats: StreamStats,
) -> Result<Self> {
let envelopes = items
.iter()
.enumerate()
.map(|(sequence, item)| {
StreamEnvelope::from_item_with_profile(
&metadata,
sequence as u64,
item,
profile.clone(),
)
})
.collect::<Result<Vec<_>>>()?;
Self::from_envelopes(metadata, envelopes, final_stats)
}
pub fn from_envelopes(
metadata: StreamMetadata,
envelopes: Vec<StreamEnvelope>,
final_stats: StreamStats,
) -> Result<Self> {
let timing = timing_from_envelopes(&metadata, &envelopes);
let diagnostics = diagnostics_from_envelopes(&envelopes);
Ok(Self {
metadata,
envelopes,
timing,
diagnostics,
final_stats,
})
}
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
pub fn envelopes(&self) -> &[StreamEnvelope] {
&self.envelopes
}
pub fn timing(&self) -> &StreamCassetteTiming {
&self.timing
}
pub fn diagnostics(&self) -> &[Symbol] {
&self.diagnostics
}
pub fn final_stats(&self) -> &StreamStats {
&self.final_stats
}
pub fn items(&self) -> Result<Vec<StreamItem>> {
self.envelopes
.iter()
.map(|envelope| {
StreamItem::with_ticks(envelope.packet().clone(), envelope.ticks().to_vec())
})
.collect()
}
pub fn replay_stream_value(&self) -> Result<StreamValue> {
Ok(StreamValue::pull(self.metadata.clone(), self.items()?))
}
pub fn to_expr(&self) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("cassette")),
Expr::Symbol(stream_cassette_format_symbol()),
),
(
Expr::Symbol(Symbol::new("metadata")),
self.metadata.table_expr(),
),
(Expr::Symbol(Symbol::new("timing")), self.timing.to_expr()),
(
Expr::Symbol(Symbol::new("envelopes")),
Expr::List(self.envelopes.iter().map(StreamEnvelope::to_expr).collect()),
),
(
Expr::Symbol(Symbol::new("diagnostics")),
Expr::List(self.diagnostics.iter().cloned().map(Expr::Symbol).collect()),
),
(
Expr::Symbol(Symbol::new("final-stats")),
stream_stats_expr(&self.final_stats),
),
])
}
pub fn from_expr(expr: &Expr) -> Result<Self> {
let Expr::Map(entries) = expr else {
return Err(Error::TypeMismatch {
expected: "stream cassette map",
found: expr_kind(expr),
});
};
ensure_fields(
entries,
&[
"cassette",
"metadata",
"timing",
"envelopes",
"diagnostics",
"final-stats",
],
)?;
let format = symbol_field(entries, "cassette")?;
if *format != stream_cassette_format_symbol() {
return Err(Error::Eval(format!(
"unknown stream cassette format {}",
format.as_qualified_str()
)));
}
let metadata = StreamMetadata::from_table_expr(field(entries, "metadata")?)?;
let envelopes = list_field(entries, "envelopes")?
.iter()
.map(|expr| StreamEnvelope::try_from(expr.clone()))
.collect::<Result<Vec<_>>>()?;
let metadata = restore_metadata_id(metadata, &envelopes);
let timing = StreamCassetteTiming::from_expr(field(entries, "timing")?)?;
let diagnostics = symbol_list(entries, "diagnostics")?;
let final_stats = stream_stats_from_expr(field(entries, "final-stats")?)?;
Ok(Self {
metadata,
envelopes,
timing,
diagnostics,
final_stats,
})
}
pub fn redacted(&self) -> Result<Self> {
let metadata = redact_metadata(&self.metadata);
let envelopes = self
.envelopes
.iter()
.map(redact_envelope)
.collect::<Result<Vec<_>>>()?;
let mut redacted = Self::from_envelopes(metadata, envelopes, self.final_stats.clone())?;
redacted.diagnostics = self.diagnostics.iter().map(redact_symbol).collect();
Ok(redacted)
}
pub fn validate_golden_fixture(&self, path: &str) -> Result<StreamGoldenFixtureReport> {
validate_fixture_path(path)?;
if !self.timing.finite {
return Err(Error::Eval(
"golden stream fixture must be finite".to_owned(),
));
}
for (index, envelope) in self.envelopes.iter().enumerate() {
if envelope.sequence() != index as u64 {
return Err(Error::Eval(format!(
"golden stream fixture sequence {} is not packet index {index}",
envelope.sequence()
)));
}
if !envelope
.profile()
.has_capability(StreamCapability::Replayable)
&& !envelope.profile().has_capability(StreamCapability::Preview)
{
return Err(Error::Eval(format!(
"golden stream fixture profile {} is not replayable or previewable",
envelope.profile().name()
)));
}
if envelope
.profile()
.has_capability(StreamCapability::Realtime)
{
return Err(Error::Eval(
"golden stream fixture cannot require realtime transport".to_owned(),
));
}
if packet_has_private_payload(envelope.packet()) || envelope_has_host_device(envelope) {
return Err(Error::Eval(
"golden stream fixture contains unredacted payload".to_owned(),
));
}
}
if metadata_has_host_device(&self.metadata)
|| is_host_device_symbol(&self.timing.clock)
|| self.diagnostics.iter().any(is_host_device_symbol)
{
return Err(Error::Eval(
"golden stream fixture contains an unredacted host device name".to_owned(),
));
}
Ok(StreamGoldenFixtureReport {
path: path.to_owned(),
format: stream_cassette_format_symbol(),
packet_count: self.envelopes.len(),
final_stats: self.final_stats.clone(),
})
}
}
impl StreamCassetteTiming {
pub fn to_expr(&self) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("clock")),
Expr::Symbol(self.clock.clone()),
),
(
Expr::Symbol(Symbol::new("packet-count")),
Expr::String(self.packet_count.to_string()),
),
(
Expr::Symbol(Symbol::new("first-sequence")),
optional_u64_expr(self.first_sequence),
),
(
Expr::Symbol(Symbol::new("last-sequence")),
optional_u64_expr(self.last_sequence),
),
(Expr::Symbol(Symbol::new("finite")), Expr::Bool(self.finite)),
])
}
pub fn from_expr(expr: &Expr) -> Result<Self> {
let Expr::Map(entries) = expr else {
return Err(Error::TypeMismatch {
expected: "stream cassette timing map",
found: expr_kind(expr),
});
};
ensure_fields(
entries,
&[
"clock",
"packet-count",
"first-sequence",
"last-sequence",
"finite",
],
)?;
Ok(Self {
clock: symbol_field(entries, "clock")?.clone(),
packet_count: parse_usize(entries, "packet-count")?,
first_sequence: optional_u64(field(entries, "first-sequence")?)?,
last_sequence: optional_u64(field(entries, "last-sequence")?)?,
finite: bool_field(entries, "finite")?,
})
}
}
pub fn stream_cassette_format_symbol() -> Symbol {
Symbol::qualified("stream/cassette", "v1")
}
pub fn stream_cassette_golden_root() -> &'static str {
STREAM_CASSETTE_FIXTURE_ROOT
}
pub fn stream_cassette_golden_extension() -> &'static str {
STREAM_CASSETTE_EXTENSION
}
fn timing_from_envelopes(
metadata: &StreamMetadata,
envelopes: &[StreamEnvelope],
) -> StreamCassetteTiming {
StreamCassetteTiming {
clock: metadata.clock().clone(),
packet_count: envelopes.len(),
first_sequence: envelopes.first().map(StreamEnvelope::sequence),
last_sequence: envelopes.last().map(StreamEnvelope::sequence),
finite: true,
}
}
fn restore_metadata_id(metadata: StreamMetadata, envelopes: &[StreamEnvelope]) -> StreamMetadata {
let Some(first) = envelopes.first() else {
return metadata;
};
if metadata.id().as_qualified_str() != first.stream_id().as_qualified_str() {
return metadata;
}
StreamMetadata::new(
first.stream_id().clone(),
metadata.media(),
metadata.direction(),
metadata.clock().clone(),
metadata.buffer().clone(),
)
}
fn diagnostics_from_envelopes(envelopes: &[StreamEnvelope]) -> Vec<Symbol> {
let mut diagnostics = Vec::new();
for envelope in envelopes {
for diagnostic in envelope.diagnostics() {
push_unique(&mut diagnostics, diagnostic.clone());
}
if let StreamPacket::Diagnostic(packet) = envelope.packet() {
push_unique(&mut diagnostics, packet.kind().clone());
}
}
diagnostics
}
fn push_unique(symbols: &mut Vec<Symbol>, symbol: Symbol) {
if !symbols.contains(&symbol) {
symbols.push(symbol);
}
}
fn validate_fixture_path(path: &str) -> Result<()> {
let Some(relative) = path.strip_prefix(STREAM_CASSETTE_FIXTURE_ROOT) else {
return Err(Error::Eval(format!(
"golden stream fixture path must live under {STREAM_CASSETTE_FIXTURE_ROOT}"
)));
};
if !relative.starts_with('/') || relative == "/" {
return Err(Error::Eval(format!(
"golden stream fixture path must live under {STREAM_CASSETTE_FIXTURE_ROOT}"
)));
}
let expected_extension = format!(".{STREAM_CASSETTE_EXTENSION}");
if !path.ends_with(&expected_extension) {
return Err(Error::Eval(format!(
"golden stream fixture path must end in .{STREAM_CASSETTE_EXTENSION}"
)));
}
Ok(())
}
fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
for (key, _) in entries {
let Expr::Symbol(symbol) = key else {
return Err(Error::TypeMismatch {
expected: "symbol stream cassette field",
found: expr_kind(key),
});
};
if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
continue;
}
return Err(Error::Eval(format!(
"unknown stream cassette field {}",
symbol.as_qualified_str()
)));
}
Ok(())
}
fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
match field(entries, name)? {
Expr::List(items) => Ok(items),
other => Err(Error::TypeMismatch {
expected: "list field",
found: expr_kind(other),
}),
}
}
fn symbol_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Symbol>> {
list_field(entries, name)?
.iter()
.map(|expr| match expr {
Expr::Symbol(symbol) => Ok(symbol.clone()),
other => Err(Error::TypeMismatch {
expected: "symbol list item",
found: expr_kind(other),
}),
})
.collect()
}
fn parse_usize(entries: &[(Expr, Expr)], name: &str) -> Result<usize> {
string_field(entries, name)?
.parse::<usize>()
.map_err(|err| Error::Eval(format!("invalid stream cassette {name}: {err}")))
}
fn optional_u64(expr: &Expr) -> Result<Option<u64>> {
match expr {
Expr::Nil => Ok(None),
Expr::String(value) => value
.parse::<u64>()
.map(Some)
.map_err(|err| Error::Eval(format!("invalid stream cassette sequence: {err}"))),
other => Err(Error::TypeMismatch {
expected: "optional u64 string",
found: expr_kind(other),
}),
}
}
fn optional_u64_expr(value: Option<u64>) -> Expr {
value
.map(|value| Expr::String(value.to_string()))
.unwrap_or(Expr::Nil)
}
fn bool_field(entries: &[(Expr, Expr)], name: &str) -> Result<bool> {
match field(entries, name)? {
Expr::Bool(value) => Ok(*value),
other => Err(Error::TypeMismatch {
expected: "bool field",
found: expr_kind(other),
}),
}
}