use sim_kernel::{Error, Expr, Result, Symbol, Tick};
use crate::{
BufferPolicy, ClockDomain, LatencyClass, StreamCapability, StreamCassette, StreamDirection,
StreamEnvelope, StreamFaultKind, StreamFaultPlan, StreamItem, StreamMedia, StreamMetadata,
StreamPacket, StreamStats, TransportProfile,
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MediaDescriptor {
symbol: Symbol,
stream_media: StreamMedia,
}
impl MediaDescriptor {
pub fn named(name: impl AsRef<str>) -> Result<Self> {
let name = name.as_ref();
if let Some(kind) = name.strip_prefix("ide/event/") {
return dev_event_media(kind);
}
let symbol = match name {
"stream/media/pcm" => StreamMedia::Pcm.symbol(),
"stream/media/midi" => StreamMedia::Midi.symbol(),
"stream/media/diagnostic" => StreamMedia::Diagnostic.symbol(),
"stream/media/data" => StreamMedia::Data.symbol(),
other => {
return Err(Error::Eval(format!(
"unsupported stream media descriptor {other}"
)));
}
};
let stream_media = StreamMedia::from_symbol(&symbol)?;
Ok(Self {
symbol,
stream_media,
})
}
pub fn symbol(&self) -> &Symbol {
&self.symbol
}
pub fn stream_media(&self) -> StreamMedia {
self.stream_media
}
}
pub fn dev_event_media(kind: &str) -> Result<MediaDescriptor> {
validate_dev_event_kind(kind)?;
Ok(MediaDescriptor {
symbol: Symbol::qualified("ide/event", kind),
stream_media: StreamMedia::Data,
})
}
pub fn dev_event_metadata(stream_id: Symbol) -> Result<StreamMetadata> {
Ok(StreamMetadata::new(
stream_id,
StreamMedia::Data,
StreamDirection::Source,
ClockDomain::ServerFrame.symbol(),
BufferPolicy::bounded(128)?,
))
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DevEvent {
kind: String,
atelier_node: Symbol,
latency_class: LatencyClass,
payload: Expr,
ticks: Vec<Tick>,
}
impl DevEvent {
pub fn new(
kind: impl Into<String>,
atelier_node: Symbol,
latency_class: LatencyClass,
payload: Expr,
) -> Result<Self> {
let kind = kind.into();
validate_dev_event_kind(&kind)?;
Ok(Self {
kind,
atelier_node,
latency_class,
payload,
ticks: Vec::new(),
})
}
pub fn edit(atelier_node: Symbol, payload: Expr) -> Result<Self> {
Self::new("edit", atelier_node, LatencyClass::Interactive, payload)
}
pub fn validate(atelier_node: Symbol, payload: Expr) -> Result<Self> {
Self::new(
"validate",
atelier_node,
LatencyClass::OfflineRender,
payload,
)
}
pub fn refusal(atelier_node: Symbol, payload: Expr) -> Result<Self> {
Self::new("refusal", atelier_node, LatencyClass::Interactive, payload)
}
pub fn with_ticks(mut self, ticks: Vec<Tick>) -> Result<Self> {
sim_kernel::validate_ticks(&ticks)?;
self.ticks = ticks;
Ok(self)
}
pub fn kind(&self) -> &str {
&self.kind
}
pub fn atelier_node(&self) -> &Symbol {
&self.atelier_node
}
pub fn latency_class(&self) -> LatencyClass {
self.latency_class
}
pub fn stream_item(&self) -> Result<StreamItem> {
StreamItem::with_ticks(
StreamPacket::data(
dev_event_media(&self.kind)?.symbol().clone(),
self.payload_expr(),
),
self.ticks.clone(),
)
}
fn transport_profile(&self) -> Result<TransportProfile> {
TransportProfile::new(
Symbol::qualified(
"stream/profile",
format!("dev-{}", self.latency_class.wire_label()),
),
self.latency_class,
vec![
StreamCapability::Deterministic,
StreamCapability::Bounded,
StreamCapability::Replayable,
],
)
}
fn payload_expr(&self) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("event-kind")),
Expr::Symbol(Symbol::qualified("ide/event", self.kind.clone())),
),
(
Expr::Symbol(Symbol::new("atelier-node")),
Expr::Symbol(self.atelier_node.clone()),
),
(
Expr::Symbol(Symbol::new("latency-class")),
Expr::Symbol(self.latency_class.symbol()),
),
(Expr::Symbol(Symbol::new("payload")), self.payload.clone()),
])
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DevCassette {
cassette: StreamCassette,
content_hash: String,
}
impl DevCassette {
pub fn from_events(stream_id: Symbol, events: Vec<DevEvent>) -> Result<Self> {
let metadata = dev_event_metadata(stream_id)?;
let envelopes = events
.iter()
.enumerate()
.map(|(sequence, event)| {
StreamEnvelope::from_item_with_profile(
&metadata,
sequence as u64,
&event.stream_item()?,
event.transport_profile()?,
)
})
.collect::<Result<Vec<_>>>()?;
let final_stats = StreamStats {
yielded: envelopes.len() as u64,
closed: true,
..StreamStats::default()
};
Self::from_stream_cassette(StreamCassette::from_envelopes(
metadata,
envelopes,
final_stats,
)?)
}
pub fn from_stream_cassette(cassette: StreamCassette) -> Result<Self> {
let content_hash = cassette_content_hash(&cassette);
Ok(Self {
cassette,
content_hash,
})
}
pub fn cassette(&self) -> &StreamCassette {
&self.cassette
}
pub fn content_hash(&self) -> &str {
&self.content_hash
}
pub fn redacted(&self) -> Result<Self> {
Self::from_stream_cassette(self.cassette.redacted()?)
}
pub fn validate_golden_fixture(&self, path: &str) -> Result<crate::StreamGoldenFixtureReport> {
self.cassette.validate_golden_fixture(path)
}
pub fn replay_content_hash(&self) -> Result<String> {
let items = self.cassette.items()?;
let metadata = self.cassette.metadata().clone();
let envelopes = items
.iter()
.enumerate()
.zip(self.cassette.envelopes())
.map(|((sequence, item), original)| {
StreamEnvelope::from_item_with_profile(
&metadata,
sequence as u64,
item,
original.profile().clone(),
)
})
.collect::<Result<Vec<_>>>()?;
let replay = StreamCassette::from_envelopes(
metadata,
envelopes,
self.cassette.final_stats().clone(),
)?;
Ok(cassette_content_hash(&replay))
}
pub fn replay_with_fault(&self, plan: &StreamFaultPlan) -> Result<DevFaultReport> {
let result = plan.apply(&self.cassette.items()?);
let mut diagnostics = result.diagnostics;
if diagnostics.contains(&StreamFaultKind::Drop.symbol()) {
push_unique(&mut diagnostics, dev_dropped_chunks_diagnostic());
}
Ok(DevFaultReport {
items: result.items,
diagnostics,
})
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DevFaultReport {
pub items: Vec<StreamItem>,
pub diagnostics: Vec<Symbol>,
}
pub fn dev_dropped_chunks_diagnostic() -> Symbol {
Symbol::qualified("dev/diagnostic", "dropped-chunks")
}
fn validate_dev_event_kind(kind: &str) -> Result<()> {
let valid = !kind.is_empty()
&& kind
.bytes()
.all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || matches!(byte, b'-'));
if valid {
Ok(())
} else {
Err(Error::Eval(format!("invalid dev event kind {kind:?}")))
}
}
fn cassette_content_hash(cassette: &StreamCassette) -> String {
let key = cassette.to_expr().canonical_key();
let mut hash = 0xcbf29ce484222325u64;
hash_bytes(&mut hash, format!("{key:?}").as_bytes());
format!("fnv1a64:{hash:016x}")
}
fn hash_bytes(hash: &mut u64, bytes: &[u8]) {
for byte in bytes {
*hash ^= u64::from(*byte);
*hash = hash.wrapping_mul(0x100000001b3);
}
}
fn push_unique(symbols: &mut Vec<Symbol>, symbol: Symbol) {
if !symbols.contains(&symbol) {
symbols.push(symbol);
}
}