Skip to main content

sim_lib_stream_core/
dev.rs

1//! Development-event media and cassettes for the SIM Atelier.
2
3use sim_kernel::{Error, Expr, Result, Symbol, Tick};
4
5use crate::{
6    BufferPolicy, ClockDomain, LatencyClass, StreamCapability, StreamCassette, StreamDirection,
7    StreamEnvelope, StreamFaultKind, StreamFaultPlan, StreamItem, StreamMedia, StreamMetadata,
8    StreamPacket, StreamStats, TransportProfile,
9};
10
11/// Descriptor for a stream media family carried by [`StreamEnvelope`].
12#[derive(Clone, Debug, PartialEq, Eq)]
13pub struct MediaDescriptor {
14    symbol: Symbol,
15    stream_media: StreamMedia,
16}
17
18impl MediaDescriptor {
19    /// Builds a descriptor from a stable symbolic media name.
20    pub fn named(name: impl AsRef<str>) -> Result<Self> {
21        let name = name.as_ref();
22        if let Some(kind) = name.strip_prefix("ide/event/") {
23            return dev_event_media(kind);
24        }
25        let symbol = match name {
26            "stream/media/pcm" => StreamMedia::Pcm.symbol(),
27            "stream/media/midi" => StreamMedia::Midi.symbol(),
28            "stream/media/diagnostic" => StreamMedia::Diagnostic.symbol(),
29            "stream/media/data" => StreamMedia::Data.symbol(),
30            other => {
31                return Err(Error::Eval(format!(
32                    "unsupported stream media descriptor {other}"
33                )));
34            }
35        };
36        let stream_media = StreamMedia::from_symbol(&symbol)?;
37        Ok(Self {
38            symbol,
39            stream_media,
40        })
41    }
42
43    /// Returns the descriptor symbol.
44    pub fn symbol(&self) -> &Symbol {
45        &self.symbol
46    }
47
48    /// Returns the envelope media used to carry this descriptor.
49    pub fn stream_media(&self) -> StreamMedia {
50        self.stream_media
51    }
52}
53
54/// Returns the descriptor for an `ide/event/<kind>` development event.
55pub fn dev_event_media(kind: &str) -> Result<MediaDescriptor> {
56    validate_dev_event_kind(kind)?;
57    Ok(MediaDescriptor {
58        symbol: Symbol::qualified("ide/event", kind),
59        stream_media: StreamMedia::Data,
60    })
61}
62
63/// Creates a stream metadata record for development events.
64pub fn dev_event_metadata(stream_id: Symbol) -> Result<StreamMetadata> {
65    Ok(StreamMetadata::new(
66        stream_id,
67        StreamMedia::Data,
68        StreamDirection::Source,
69        ClockDomain::ServerFrame.symbol(),
70        BufferPolicy::bounded(128)?,
71    ))
72}
73
74/// One development event before it is wrapped in a [`StreamEnvelope`].
75#[derive(Clone, Debug, PartialEq, Eq)]
76pub struct DevEvent {
77    kind: String,
78    atelier_node: Symbol,
79    latency_class: LatencyClass,
80    payload: Expr,
81    ticks: Vec<Tick>,
82}
83
84impl DevEvent {
85    /// Builds a development event with an explicit latency class.
86    pub fn new(
87        kind: impl Into<String>,
88        atelier_node: Symbol,
89        latency_class: LatencyClass,
90        payload: Expr,
91    ) -> Result<Self> {
92        let kind = kind.into();
93        validate_dev_event_kind(&kind)?;
94        Ok(Self {
95            kind,
96            atelier_node,
97            latency_class,
98            payload,
99            ticks: Vec::new(),
100        })
101    }
102
103    /// Builds an interactive edit event.
104    pub fn edit(atelier_node: Symbol, payload: Expr) -> Result<Self> {
105        Self::new("edit", atelier_node, LatencyClass::Interactive, payload)
106    }
107
108    /// Builds an offline validation event.
109    pub fn validate(atelier_node: Symbol, payload: Expr) -> Result<Self> {
110        Self::new(
111            "validate",
112            atelier_node,
113            LatencyClass::OfflineRender,
114            payload,
115        )
116    }
117
118    /// Builds a refusal event for a denied development action.
119    pub fn refusal(atelier_node: Symbol, payload: Expr) -> Result<Self> {
120        Self::new("refusal", atelier_node, LatencyClass::Interactive, payload)
121    }
122
123    /// Attaches ticks to this event.
124    pub fn with_ticks(mut self, ticks: Vec<Tick>) -> Result<Self> {
125        sim_kernel::validate_ticks(&ticks)?;
126        self.ticks = ticks;
127        Ok(self)
128    }
129
130    /// Returns the development event kind.
131    pub fn kind(&self) -> &str {
132        &self.kind
133    }
134
135    /// Returns the originating Atelier node id.
136    pub fn atelier_node(&self) -> &Symbol {
137        &self.atelier_node
138    }
139
140    /// Returns the event latency class.
141    pub fn latency_class(&self) -> LatencyClass {
142        self.latency_class
143    }
144
145    /// Converts this event to a stream item using `ide/event/<kind>` data.
146    pub fn stream_item(&self) -> Result<StreamItem> {
147        StreamItem::with_ticks(
148            StreamPacket::data(
149                dev_event_media(&self.kind)?.symbol().clone(),
150                self.payload_expr(),
151            ),
152            self.ticks.clone(),
153        )
154    }
155
156    fn transport_profile(&self) -> Result<TransportProfile> {
157        TransportProfile::new(
158            Symbol::qualified(
159                "stream/profile",
160                format!("dev-{}", self.latency_class.wire_label()),
161            ),
162            self.latency_class,
163            vec![
164                StreamCapability::Deterministic,
165                StreamCapability::Bounded,
166                StreamCapability::Replayable,
167            ],
168        )
169    }
170
171    fn payload_expr(&self) -> Expr {
172        Expr::Map(vec![
173            (
174                Expr::Symbol(Symbol::new("event-kind")),
175                Expr::Symbol(Symbol::qualified("ide/event", self.kind.clone())),
176            ),
177            (
178                Expr::Symbol(Symbol::new("atelier-node")),
179                Expr::Symbol(self.atelier_node.clone()),
180            ),
181            (
182                Expr::Symbol(Symbol::new("latency-class")),
183                Expr::Symbol(self.latency_class.symbol()),
184            ),
185            (Expr::Symbol(Symbol::new("payload")), self.payload.clone()),
186        ])
187    }
188}
189
190/// A development cassette backed by the standard [`StreamCassette`] format.
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct DevCassette {
193    cassette: StreamCassette,
194    content_hash: String,
195}
196
197impl DevCassette {
198    /// Records a development session into a stream cassette.
199    pub fn from_events(stream_id: Symbol, events: Vec<DevEvent>) -> Result<Self> {
200        let metadata = dev_event_metadata(stream_id)?;
201        let envelopes = events
202            .iter()
203            .enumerate()
204            .map(|(sequence, event)| {
205                StreamEnvelope::from_item_with_profile(
206                    &metadata,
207                    sequence as u64,
208                    &event.stream_item()?,
209                    event.transport_profile()?,
210                )
211            })
212            .collect::<Result<Vec<_>>>()?;
213        let final_stats = StreamStats {
214            yielded: envelopes.len() as u64,
215            closed: true,
216            ..StreamStats::default()
217        };
218        Self::from_stream_cassette(StreamCassette::from_envelopes(
219            metadata,
220            envelopes,
221            final_stats,
222        )?)
223    }
224
225    /// Wraps an existing stream cassette as a development cassette.
226    pub fn from_stream_cassette(cassette: StreamCassette) -> Result<Self> {
227        let content_hash = cassette_content_hash(&cassette);
228        Ok(Self {
229            cassette,
230            content_hash,
231        })
232    }
233
234    /// Returns the underlying stream cassette.
235    pub fn cassette(&self) -> &StreamCassette {
236        &self.cassette
237    }
238
239    /// Returns the deterministic cassette content hash.
240    pub fn content_hash(&self) -> &str {
241        &self.content_hash
242    }
243
244    /// Returns a copy with host paths, host names, and private payloads redacted.
245    pub fn redacted(&self) -> Result<Self> {
246        Self::from_stream_cassette(self.cassette.redacted()?)
247    }
248
249    /// Delegates golden fixture validation to the underlying stream cassette.
250    pub fn validate_golden_fixture(&self, path: &str) -> Result<crate::StreamGoldenFixtureReport> {
251        self.cassette.validate_golden_fixture(path)
252    }
253
254    /// Replays the cassette and recomputes the content hash.
255    pub fn replay_content_hash(&self) -> Result<String> {
256        let items = self.cassette.items()?;
257        let metadata = self.cassette.metadata().clone();
258        let envelopes = items
259            .iter()
260            .enumerate()
261            .zip(self.cassette.envelopes())
262            .map(|((sequence, item), original)| {
263                StreamEnvelope::from_item_with_profile(
264                    &metadata,
265                    sequence as u64,
266                    item,
267                    original.profile().clone(),
268                )
269            })
270            .collect::<Result<Vec<_>>>()?;
271        let replay = StreamCassette::from_envelopes(
272            metadata,
273            envelopes,
274            self.cassette.final_stats().clone(),
275        )?;
276        Ok(cassette_content_hash(&replay))
277    }
278
279    /// Replays the cassette with a stream fault plan applied.
280    pub fn replay_with_fault(&self, plan: &StreamFaultPlan) -> Result<DevFaultReport> {
281        let result = plan.apply(&self.cassette.items()?);
282        let mut diagnostics = result.diagnostics;
283        if diagnostics.contains(&StreamFaultKind::Drop.symbol()) {
284            push_unique(&mut diagnostics, dev_dropped_chunks_diagnostic());
285        }
286        Ok(DevFaultReport {
287            items: result.items,
288            diagnostics,
289        })
290    }
291}
292
293/// Result of replaying a development cassette with an injected fault.
294#[derive(Clone, Debug, PartialEq, Eq)]
295pub struct DevFaultReport {
296    /// Items left after the fault plan is applied.
297    pub items: Vec<StreamItem>,
298    /// Diagnostics emitted by the fault replay.
299    pub diagnostics: Vec<Symbol>,
300}
301
302/// Diagnostic emitted when a cassette replay drops development chunks.
303pub fn dev_dropped_chunks_diagnostic() -> Symbol {
304    Symbol::qualified("dev/diagnostic", "dropped-chunks")
305}
306
307fn validate_dev_event_kind(kind: &str) -> Result<()> {
308    let valid = !kind.is_empty()
309        && kind
310            .bytes()
311            .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || matches!(byte, b'-'));
312    if valid {
313        Ok(())
314    } else {
315        Err(Error::Eval(format!("invalid dev event kind {kind:?}")))
316    }
317}
318
319fn cassette_content_hash(cassette: &StreamCassette) -> String {
320    let key = cassette.to_expr().canonical_key();
321    let mut hash = 0xcbf29ce484222325u64;
322    hash_bytes(&mut hash, format!("{key:?}").as_bytes());
323    format!("fnv1a64:{hash:016x}")
324}
325
326fn hash_bytes(hash: &mut u64, bytes: &[u8]) {
327    for byte in bytes {
328        *hash ^= u64::from(*byte);
329        *hash = hash.wrapping_mul(0x100000001b3);
330    }
331}
332
333fn push_unique(symbols: &mut Vec<Symbol>, symbol: Symbol) {
334    if !symbols.contains(&symbol) {
335        symbols.push(symbol);
336    }
337}