Skip to main content

sim_lib_stream_core/
envelope.rs

1//! Stream boundary contract: the [`StreamEnvelope`] that wraps every packet
2//! crossing the streaming fabric.
3//!
4//! An envelope binds a [`StreamPacket`] to the routing and timing metadata a
5//! transport needs to carry it: the originating stream and packet ids, the
6//! media and direction, a monotonic sequence number, the [`Tick`]s that locate
7//! it on its clocks, the [`ClockDomain`]s it rides, the [`TransportProfile`]
8//! that bounds what the transport may do, and any diagnostics raised along the
9//! way.
10//!
11//! The kernel owns the protocol vocabulary referenced here -- [`Symbol`],
12//! [`Expr`], [`Tick`], and the clock/capability contracts. This module supplies
13//! the concrete envelope behavior: construction with validation, the wire form
14//! ([`StreamEnvelope::to_expr`] / [`TryFrom<Expr>`]), and the mapping of
15//! clock-domain symbols to the [`ClockDomain`] enum.
16
17use std::str::FromStr;
18
19#[path = "envelope/profile.rs"]
20mod profile;
21#[path = "envelope/ref_codec.rs"]
22mod ref_codec;
23
24use sim_kernel::{Error, Expr, Result, Symbol, Tick};
25
26use crate::buffer::{expr_kind, field, string_field, symbol_field};
27use crate::{StreamDirection, StreamItem, StreamMedia, StreamMetadata, StreamPacket};
28pub use profile::{LatencyClass, StreamCapability, TransportProfile};
29use ref_codec::{ref_expr, ref_from_expr};
30
31/// Wire version of the [`StreamEnvelope`] map form.
32///
33/// Encoded into every [`StreamEnvelope::to_expr`] map and checked on decode;
34/// envelopes carrying any other version are rejected.
35pub const STREAM_ENVELOPE_VERSION: u32 = 1;
36
37/// Clock a stream is timed against.
38///
39/// Each variant names one timeline a packet can ride; the kernel defines the
40/// clock-domain contract as [`Symbol`]s, and this enum is the concrete set this
41/// fabric understands. [`ClockDomain::symbol`] maps a variant to its kernel
42/// symbol and [`ClockDomain::from_symbol`] parses it back, accepting the bare
43/// label, the `clock/<label>` form, and the fully qualified
44/// `stream/clock-domain/<label>` form.
45///
46/// # Examples
47///
48/// ```
49/// use sim_lib_stream_core::ClockDomain;
50///
51/// let domain = ClockDomain::Sample;
52/// assert_eq!(domain.wire_label(), "sample");
53/// let parsed = ClockDomain::from_symbol(&domain.symbol()).unwrap();
54/// assert_eq!(parsed, domain);
55/// ```
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub enum ClockDomain {
58    /// Per-sample audio timeline (the finest audio clock).
59    Sample,
60    /// Per-block processing timeline (one tick per audio block).
61    Block,
62    /// Control-rate timeline for parameter and modulation updates.
63    Control,
64    /// MIDI tick timeline (musical clock pulses).
65    MidiTick,
66    /// Wall-clock (real-world) time.
67    Wall,
68    /// Transport timeline (musical position: bars/beats under play control).
69    Transport,
70    /// Server-side frame timeline.
71    ServerFrame,
72    /// Browser-side frame timeline (client render cadence).
73    BrowserFrame,
74    /// Trace-step timeline for stepped/replayed execution.
75    TraceStep,
76    /// Job timeline keyed to background job progress.
77    Job,
78}
79
80impl ClockDomain {
81    /// Returns the stable wire label for this domain (for example `"sample"`).
82    pub fn wire_label(self) -> &'static str {
83        match self {
84            Self::Sample => "sample",
85            Self::Block => "block",
86            Self::Control => "control",
87            Self::MidiTick => "midi-tick",
88            Self::Wall => "wall",
89            Self::Transport => "transport",
90            Self::ServerFrame => "server-frame",
91            Self::BrowserFrame => "browser-frame",
92            Self::TraceStep => "trace-step",
93            Self::Job => "job",
94        }
95    }
96
97    /// Returns the kernel [`Symbol`] for this domain, namespaced under
98    /// `stream/clock-domain`.
99    pub fn symbol(self) -> Symbol {
100        Symbol::qualified("stream/clock-domain", self.wire_label())
101    }
102
103    /// Parses a [`ClockDomain`] from a kernel [`Symbol`].
104    ///
105    /// Accepts the bare label, the legacy `clock/<label>` form, and the fully
106    /// qualified `stream/clock-domain/<label>` form. Returns an error for any
107    /// unrecognized clock domain.
108    pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
109        match symbol.as_qualified_str().as_str() {
110            "sample" | "clock/sample" | "stream/clock-domain/sample" => Ok(Self::Sample),
111            "block" | "clock/block" | "stream/clock-domain/block" => Ok(Self::Block),
112            "control" | "clock/control" | "stream/clock-domain/control" => Ok(Self::Control),
113            "midi"
114            | "midi-tick"
115            | "clock/midi"
116            | "clock/midi-tick"
117            | "stream/clock-domain/midi-tick" => Ok(Self::MidiTick),
118            "wall" | "clock/wall" | "stream/clock-domain/wall" => Ok(Self::Wall),
119            "transport" | "clock/transport" | "stream/clock-domain/transport" => {
120                Ok(Self::Transport)
121            }
122            "server-frame" | "clock/server-frame" | "stream/clock-domain/server-frame" => {
123                Ok(Self::ServerFrame)
124            }
125            "browser-frame" | "clock/browser-frame" | "stream/clock-domain/browser-frame" => {
126                Ok(Self::BrowserFrame)
127            }
128            "trace-step" | "clock/trace-step" | "stream/clock-domain/trace-step" => {
129                Ok(Self::TraceStep)
130            }
131            "job" | "clock/job" | "stream/clock-domain/job" => Ok(Self::Job),
132            other => Err(Error::Eval(format!("unknown stream clock domain {other}"))),
133        }
134    }
135
136    /// Resolves the clock domain for a stream's declared clock symbol, falling
137    /// back to [`ClockDomain::ServerFrame`] when the symbol is unrecognized.
138    pub fn for_stream_clock(symbol: &Symbol) -> Self {
139        Self::from_symbol(symbol).unwrap_or(Self::ServerFrame)
140    }
141}
142
143/// A single packet plus the routing and timing metadata that carries it across
144/// the streaming fabric.
145///
146/// Every envelope is constructed through a validating constructor that checks
147/// its [`Tick`]s, confirms the declared [`StreamMedia`] matches the wrapped
148/// [`StreamPacket`], folds each tick's clock into the recorded clock-domain set,
149/// and stamps the current [`STREAM_ENVELOPE_VERSION`]. The struct fields are
150/// private; read access is through the accessor methods, and the wire form is
151/// produced by [`StreamEnvelope::to_expr`] and recovered by [`TryFrom<Expr>`].
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct StreamEnvelope {
154    version: u32,
155    stream_id: Symbol,
156    packet_id: Symbol,
157    media: StreamMedia,
158    direction: StreamDirection,
159    sequence: u64,
160    ticks: Vec<Tick>,
161    clock_domain: ClockDomain,
162    clock_domains: Vec<ClockDomain>,
163    profile: TransportProfile,
164    diagnostics: Vec<Symbol>,
165    packet: StreamPacket,
166}
167
168impl StreamEnvelope {
169    /// Builds an envelope whose clock-domain set is seeded from the single
170    /// primary `clock_domain`.
171    ///
172    /// Validates the ticks and checks that `media` matches the packet's media;
173    /// see [`StreamEnvelope::new_with_clock_domains`] for the full contract.
174    #[allow(clippy::too_many_arguments)]
175    pub fn new(
176        stream_id: Symbol,
177        packet_id: Symbol,
178        media: StreamMedia,
179        direction: StreamDirection,
180        sequence: u64,
181        ticks: Vec<Tick>,
182        clock_domain: ClockDomain,
183        profile: TransportProfile,
184        diagnostics: Vec<Symbol>,
185        packet: StreamPacket,
186    ) -> Result<Self> {
187        Self::new_with_clock_domains(
188            stream_id,
189            packet_id,
190            media,
191            direction,
192            sequence,
193            ticks,
194            clock_domain,
195            vec![clock_domain],
196            profile,
197            diagnostics,
198            packet,
199        )
200    }
201
202    /// Builds an envelope with an explicit set of clock domains.
203    ///
204    /// Validates the ticks via the kernel, requires `media` to equal the
205    /// wrapped packet's media (erroring otherwise), augments `clock_domains`
206    /// with each tick's clock domain, and normalizes the result so the primary
207    /// `clock_domain` leads and no domain repeats. The stored version is always
208    /// [`STREAM_ENVELOPE_VERSION`].
209    #[allow(clippy::too_many_arguments)]
210    pub fn new_with_clock_domains(
211        stream_id: Symbol,
212        packet_id: Symbol,
213        media: StreamMedia,
214        direction: StreamDirection,
215        sequence: u64,
216        ticks: Vec<Tick>,
217        clock_domain: ClockDomain,
218        clock_domains: Vec<ClockDomain>,
219        profile: TransportProfile,
220        diagnostics: Vec<Symbol>,
221        packet: StreamPacket,
222    ) -> Result<Self> {
223        sim_kernel::validate_ticks(&ticks)?;
224        let packet_media = packet.media();
225        if packet_media != media {
226            return Err(Error::Eval(format!(
227                "stream envelope media {} does not match packet media {}",
228                media.symbol(),
229                packet_media.symbol()
230            )));
231        }
232        let mut all_clock_domains = clock_domains;
233        for tick in &ticks {
234            all_clock_domains.push(ClockDomain::from_symbol(&tick.clock)?);
235        }
236        let clock_domains = normalize_clock_domains(clock_domain, all_clock_domains);
237        Ok(Self {
238            version: STREAM_ENVELOPE_VERSION,
239            stream_id,
240            packet_id,
241            media,
242            direction,
243            sequence,
244            ticks,
245            clock_domain,
246            clock_domains,
247            profile,
248            diagnostics,
249            packet,
250        })
251    }
252
253    /// Builds an envelope from stream `metadata` and one [`StreamItem`], using
254    /// the in-memory-local [`TransportProfile`].
255    ///
256    /// Derives the packet id from the stream id and `sequence`, and resolves the
257    /// clock domain from the metadata's declared clock. A convenience wrapper
258    /// over [`StreamEnvelope::from_item_with_profile`].
259    pub fn from_item(metadata: &StreamMetadata, sequence: u64, item: &StreamItem) -> Result<Self> {
260        Self::from_item_with_profile(metadata, sequence, item, TransportProfile::memory_local())
261    }
262
263    /// Builds an envelope from stream `metadata` and one [`StreamItem`] under an
264    /// explicit [`TransportProfile`].
265    ///
266    /// Derives the packet id from the stream id and `sequence`, copies the
267    /// item's ticks and packet, and resolves the clock domain from the
268    /// metadata's clock via [`ClockDomain::for_stream_clock`]. No diagnostics
269    /// are attached.
270    pub fn from_item_with_profile(
271        metadata: &StreamMetadata,
272        sequence: u64,
273        item: &StreamItem,
274        profile: TransportProfile,
275    ) -> Result<Self> {
276        Self::new(
277            metadata.id().clone(),
278            packet_id(metadata.id(), sequence),
279            metadata.media(),
280            metadata.direction(),
281            sequence,
282            item.ticks().to_vec(),
283            ClockDomain::for_stream_clock(metadata.clock()),
284            profile,
285            Vec::new(),
286            item.packet().clone(),
287        )
288    }
289
290    /// Returns the envelope wire version (always [`STREAM_ENVELOPE_VERSION`]).
291    pub fn version(&self) -> u32 {
292        self.version
293    }
294
295    /// Returns the id of the stream this envelope belongs to.
296    pub fn stream_id(&self) -> &Symbol {
297        &self.stream_id
298    }
299
300    /// Returns the id of the wrapped packet.
301    pub fn packet_id(&self) -> &Symbol {
302        &self.packet_id
303    }
304
305    /// Returns the media type carried by this envelope.
306    pub fn media(&self) -> StreamMedia {
307        self.media
308    }
309
310    /// Returns the direction of flow for this envelope.
311    pub fn direction(&self) -> StreamDirection {
312        self.direction
313    }
314
315    /// Returns the monotonic sequence number of this envelope within its stream.
316    pub fn sequence(&self) -> u64 {
317        self.sequence
318    }
319
320    /// Returns the [`Tick`]s that locate this envelope on its clocks.
321    pub fn ticks(&self) -> &[Tick] {
322        &self.ticks
323    }
324
325    /// Returns the primary clock domain this envelope is timed against.
326    pub fn clock_domain(&self) -> ClockDomain {
327        self.clock_domain
328    }
329
330    /// Returns every clock domain this envelope rides.
331    ///
332    /// The primary [`ClockDomain::clock_domain`](StreamEnvelope::clock_domain)
333    /// leads, followed by any additional domains contributed by the ticks, with
334    /// no repeats.
335    pub fn clock_domains(&self) -> &[ClockDomain] {
336        &self.clock_domains
337    }
338
339    /// Returns the transport profile bounding what a carrier may do with this
340    /// envelope.
341    pub fn profile(&self) -> &TransportProfile {
342        &self.profile
343    }
344
345    /// Returns the diagnostic symbols attached to this envelope.
346    pub fn diagnostics(&self) -> &[Symbol] {
347        &self.diagnostics
348    }
349
350    /// Returns the wrapped packet payload.
351    pub fn packet(&self) -> &StreamPacket {
352        &self.packet
353    }
354
355    /// Encodes this envelope into its [`Expr`] map wire form.
356    ///
357    /// The map is tagged with [`stream_envelope_tag_symbol`] and round-trips
358    /// back through the [`TryFrom<Expr>`] implementation.
359    pub fn to_expr(&self) -> Expr {
360        Expr::Map(vec![
361            (
362                Expr::Symbol(Symbol::new("envelope")),
363                Expr::Symbol(stream_envelope_tag_symbol()),
364            ),
365            (
366                Expr::Symbol(Symbol::new("version")),
367                Expr::String(self.version.to_string()),
368            ),
369            (
370                Expr::Symbol(Symbol::new("stream-id")),
371                Expr::Symbol(self.stream_id.clone()),
372            ),
373            (
374                Expr::Symbol(Symbol::new("packet-id")),
375                Expr::Symbol(self.packet_id.clone()),
376            ),
377            (
378                Expr::Symbol(Symbol::new("media")),
379                Expr::Symbol(self.media.symbol()),
380            ),
381            (
382                Expr::Symbol(Symbol::new("direction")),
383                Expr::Symbol(self.direction.symbol()),
384            ),
385            (
386                Expr::Symbol(Symbol::new("sequence")),
387                Expr::String(self.sequence.to_string()),
388            ),
389            (
390                Expr::Symbol(Symbol::new("ticks")),
391                Expr::List(self.ticks.iter().map(tick_expr).collect()),
392            ),
393            (
394                Expr::Symbol(Symbol::new("clock-domain")),
395                Expr::Symbol(self.clock_domain.symbol()),
396            ),
397            (
398                Expr::Symbol(Symbol::new("clock-domains")),
399                Expr::List(
400                    self.clock_domains
401                        .iter()
402                        .map(|domain| Expr::Symbol(domain.symbol()))
403                        .collect(),
404                ),
405            ),
406            (Expr::Symbol(Symbol::new("profile")), self.profile.to_expr()),
407            (
408                Expr::Symbol(Symbol::new("diagnostics")),
409                Expr::List(self.diagnostics.iter().cloned().map(Expr::Symbol).collect()),
410            ),
411            (Expr::Symbol(Symbol::new("packet")), self.packet.to_expr()),
412        ])
413    }
414}
415
416impl TryFrom<Expr> for StreamEnvelope {
417    type Error = Error;
418
419    fn try_from(expr: Expr) -> Result<Self> {
420        let Expr::Map(entries) = &expr else {
421            return Err(Error::TypeMismatch {
422                expected: "stream envelope map",
423                found: expr_kind(&expr),
424            });
425        };
426        ensure_fields(
427            entries,
428            &[
429                "envelope",
430                "version",
431                "stream-id",
432                "packet-id",
433                "media",
434                "direction",
435                "sequence",
436                "ticks",
437                "clock-domain",
438                "clock-domains",
439                "profile",
440                "diagnostics",
441                "packet",
442            ],
443        )?;
444        let tag = symbol_field(entries, "envelope")?;
445        if *tag != stream_envelope_tag_symbol() {
446            return Err(Error::Eval(format!(
447                "unknown stream envelope tag {}",
448                tag.as_qualified_str()
449            )));
450        }
451        let version = parse_string_field::<u32>(entries, "version")?;
452        if version != STREAM_ENVELOPE_VERSION {
453            return Err(Error::Eval(format!(
454                "unsupported stream envelope version {version}"
455            )));
456        }
457        let packet = StreamPacket::try_from(field(entries, "packet")?.clone())?;
458        let ticks = tick_list(entries, "ticks")?;
459        Self::new_with_clock_domains(
460            symbol_field(entries, "stream-id")?.clone(),
461            symbol_field(entries, "packet-id")?.clone(),
462            StreamMedia::from_symbol(symbol_field(entries, "media")?)?,
463            StreamDirection::from_symbol(symbol_field(entries, "direction")?)?,
464            parse_string_field::<u64>(entries, "sequence")?,
465            ticks,
466            ClockDomain::from_symbol(symbol_field(entries, "clock-domain")?)?,
467            clock_domain_list(entries, "clock-domains")?,
468            TransportProfile::from_expr(field(entries, "profile")?)?,
469            symbol_list(entries, "diagnostics")?.to_vec(),
470            packet,
471        )
472    }
473}
474
475/// Returns the runtime tag [`Symbol`] that marks a map as a stream envelope.
476///
477/// Written under the `envelope` key by [`StreamEnvelope::to_expr`] and required
478/// on decode by the [`TryFrom<Expr>`] implementation.
479pub fn stream_envelope_tag_symbol() -> Symbol {
480    Symbol::qualified("stream/envelope", "v1")
481}
482
483fn packet_id(stream_id: &Symbol, sequence: u64) -> Symbol {
484    Symbol::qualified(
485        "stream/packet-id",
486        format!("{}#{sequence}", stream_id.as_qualified_str()),
487    )
488}
489
490fn tick_expr(tick: &Tick) -> Expr {
491    Expr::Map(vec![
492        (
493            Expr::Symbol(Symbol::new("clock")),
494            Expr::Symbol(tick.clock.clone()),
495        ),
496        (Expr::Symbol(Symbol::new("index")), ref_expr(&tick.index)),
497    ])
498}
499
500fn tick_from_expr(expr: &Expr) -> Result<Tick> {
501    let Expr::Map(entries) = expr else {
502        return Err(Error::TypeMismatch {
503            expected: "stream tick map",
504            found: expr_kind(expr),
505        });
506    };
507    ensure_fields(entries, &["clock", "index"])?;
508    Ok(Tick::new(
509        symbol_field(entries, "clock")?.clone(),
510        ref_from_expr(field(entries, "index")?)?,
511    ))
512}
513
514fn parse_string_field<T>(entries: &[(Expr, Expr)], name: &str) -> Result<T>
515where
516    T: FromStr,
517    T::Err: std::fmt::Display,
518{
519    string_field(entries, name)?
520        .parse::<T>()
521        .map_err(|err| Error::Eval(format!("invalid stream envelope {name}: {err}")))
522}
523
524fn tick_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Tick>> {
525    list_field(entries, name)?
526        .iter()
527        .map(tick_from_expr)
528        .collect()
529}
530
531fn clock_domain_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<ClockDomain>> {
532    symbol_list(entries, name)?
533        .iter()
534        .map(ClockDomain::from_symbol)
535        .collect()
536}
537
538fn symbol_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Symbol>> {
539    list_field(entries, name)?
540        .iter()
541        .map(|expr| match expr {
542            Expr::Symbol(symbol) => Ok(symbol.clone()),
543            other => Err(Error::TypeMismatch {
544                expected: "symbol list item",
545                found: expr_kind(other),
546            }),
547        })
548        .collect()
549}
550
551fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
552    match field(entries, name)? {
553        Expr::List(items) => Ok(items),
554        other => Err(Error::TypeMismatch {
555            expected: "list field",
556            found: expr_kind(other),
557        }),
558    }
559}
560
561fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
562    for (key, _) in entries {
563        let Expr::Symbol(symbol) = key else {
564            return Err(Error::TypeMismatch {
565                expected: "symbol stream envelope field",
566                found: expr_kind(key),
567            });
568        };
569        if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
570            continue;
571        }
572        return Err(Error::Eval(format!(
573            "unknown stream envelope field {}",
574            symbol.as_qualified_str()
575        )));
576    }
577    Ok(())
578}
579
580fn normalize_clock_domains(
581    primary: ClockDomain,
582    clock_domains: Vec<ClockDomain>,
583) -> Vec<ClockDomain> {
584    let mut domains = vec![primary];
585    for domain in clock_domains {
586        if !domains.contains(&domain) {
587            domains.push(domain);
588        }
589    }
590    domains
591}