Skip to main content

sim_lib_stream_core/
metadata.rs

1//! Stream metadata values and their publication into the runtime claim store.
2//!
3//! This module supplies the concrete description of a stream's identity:
4//! [`StreamMetadata`] bundles a stream id, its [`StreamMedia`] kind,
5//! [`StreamDirection`], clock symbol, and buffer policy. [`RateContract`]
6//! captures the clock-domain/latency/rate agreement two ports must share to
7//! connect.
8//!
9//! The kernel defines the claim/fact contract and the clock-domain surface;
10//! this module supplies the streaming-fabric behavior on top of it. The
11//! `stream_*_predicate` helpers name the predicate symbols, and
12//! [`publish_metadata_claims`] writes a stream's metadata into the runtime as
13//! public facts so other libraries can query a stream's shape.
14
15use sim_kernel::{
16    Claim, ClaimPattern, Cx, Error, Expr, Ref, Result, Symbol, stream_surface,
17    stream_surface::publish_stream_metadata_claims,
18};
19
20use crate::buffer::{BufferPolicy, expr_kind, field, string_field, symbol_field};
21use crate::{ClockDomain, LatencyClass};
22
23/// Media kind carried by a stream.
24///
25/// Selects which packet profile an envelope on the stream is expected to
26/// carry and which `stream/media/*` symbol identifies the stream in the
27/// claim store.
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum StreamMedia {
30    /// Real-time PCM audio frames (see [`PcmPacket`](crate::PcmPacket)).
31    Pcm,
32    /// MIDI events (see [`MidiPacket`](crate::MidiPacket)).
33    Midi,
34    /// Diagnostic messages (see [`StreamDiagnostic`](crate::StreamDiagnostic)).
35    Diagnostic,
36    /// Opaque structured data payloads (see [`DataPacket`](crate::DataPacket)).
37    Data,
38}
39
40impl StreamMedia {
41    /// Returns the `stream/media/*` symbol identifying this media kind.
42    pub fn symbol(self) -> Symbol {
43        match self {
44            Self::Pcm => Symbol::qualified("stream/media", "pcm"),
45            Self::Midi => Symbol::qualified("stream/media", "midi"),
46            Self::Diagnostic => Symbol::qualified("stream/media", "diagnostic"),
47            Self::Data => Symbol::qualified("stream/media", "data"),
48        }
49    }
50
51    /// Parses a [`StreamMedia`] from its `stream/media/*` symbol.
52    ///
53    /// Returns an error for any symbol outside the known media kinds.
54    pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
55        match symbol.as_qualified_str().as_str() {
56            "stream/media/pcm" => Ok(Self::Pcm),
57            "stream/media/midi" => Ok(Self::Midi),
58            "stream/media/diagnostic" => Ok(Self::Diagnostic),
59            "stream/media/data" => Ok(Self::Data),
60            other => Err(Error::Eval(format!("unknown stream media {other}"))),
61        }
62    }
63}
64
65/// Flow direction of a stream relative to its owner.
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum StreamDirection {
68    /// Produces envelopes (output).
69    Source,
70    /// Consumes envelopes (input).
71    Sink,
72    /// Both produces and consumes envelopes.
73    Duplex,
74}
75
76impl StreamDirection {
77    /// Returns the `stream/direction/*` symbol identifying this direction.
78    pub fn symbol(self) -> Symbol {
79        match self {
80            Self::Source => Symbol::qualified("stream/direction", "source"),
81            Self::Sink => Symbol::qualified("stream/direction", "sink"),
82            Self::Duplex => Symbol::qualified("stream/direction", "duplex"),
83        }
84    }
85
86    /// Parses a [`StreamDirection`] from its `stream/direction/*` symbol.
87    ///
88    /// Returns an error for any symbol outside the known directions.
89    pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
90        match symbol.as_qualified_str().as_str() {
91            "stream/direction/source" => Ok(Self::Source),
92            "stream/direction/sink" => Ok(Self::Sink),
93            "stream/direction/duplex" => Ok(Self::Duplex),
94            other => Err(Error::Eval(format!("unknown stream direction {other}"))),
95        }
96    }
97}
98
99/// Timing agreement a stream port advertises and must share to connect.
100///
101/// Pairs a [`ClockDomain`] with a [`LatencyClass`] and an optional nominal
102/// sample rate in hertz. Two ports may be wired together only when their
103/// contracts are compatible (see [`is_compatible_with`](RateContract::is_compatible_with)).
104#[derive(Clone, Copy, Debug, PartialEq, Eq)]
105pub struct RateContract {
106    clock_domain: ClockDomain,
107    latency_class: LatencyClass,
108    nominal_rate_hz: Option<u32>,
109}
110
111impl RateContract {
112    /// Builds a contract from an explicit clock domain, latency class, and
113    /// optional nominal rate.
114    pub fn new(
115        clock_domain: ClockDomain,
116        latency_class: LatencyClass,
117        nominal_rate_hz: Option<u32>,
118    ) -> Self {
119        Self {
120            clock_domain,
121            latency_class,
122            nominal_rate_hz,
123        }
124    }
125
126    /// Contract for sample-exact audio: sample clock domain, sample-exact
127    /// latency, and the given nominal rate.
128    pub fn sample_exact(nominal_rate_hz: Option<u32>) -> Self {
129        Self::new(
130            ClockDomain::Sample,
131            LatencyClass::SampleExact,
132            nominal_rate_hz,
133        )
134    }
135
136    /// Contract for block-local processing: block clock domain and block-local
137    /// latency, with no fixed nominal rate.
138    pub fn block_local() -> Self {
139        Self::new(ClockDomain::Block, LatencyClass::BlockLocal, None)
140    }
141
142    /// Contract for interactive control traffic: control clock domain and
143    /// interactive latency.
144    pub fn control() -> Self {
145        Self::new(ClockDomain::Control, LatencyClass::Interactive, None)
146    }
147
148    /// Contract for MIDI-tick traffic: MIDI-tick clock domain and interactive
149    /// latency.
150    pub fn midi_tick() -> Self {
151        Self::new(ClockDomain::MidiTick, LatencyClass::Interactive, None)
152    }
153
154    /// Contract for offline trace stepping: trace-step clock domain and
155    /// offline-render latency.
156    pub fn trace_step() -> Self {
157        Self::new(ClockDomain::TraceStep, LatencyClass::OfflineRender, None)
158    }
159
160    /// Returns the clock domain this contract runs in.
161    pub fn clock_domain(self) -> ClockDomain {
162        self.clock_domain
163    }
164
165    /// Returns the latency class this contract promises.
166    pub fn latency_class(self) -> LatencyClass {
167        self.latency_class
168    }
169
170    /// Returns the nominal sample rate in hertz, if one is fixed.
171    pub fn nominal_rate_hz(self) -> Option<u32> {
172        self.nominal_rate_hz
173    }
174
175    /// Reports whether `self` and `other` may be connected.
176    ///
177    /// Compatible means matching clock domain and latency class; nominal rates
178    /// must agree only when both are fixed (an unset rate matches any rate).
179    pub fn is_compatible_with(self, other: Self) -> bool {
180        self.clock_domain == other.clock_domain
181            && self.latency_class == other.latency_class
182            && rates_are_compatible(self.nominal_rate_hz, other.nominal_rate_hz)
183    }
184
185    /// Checks compatibility with `other`, returning a descriptive error when
186    /// the two contracts cannot be connected.
187    pub fn ensure_compatible(self, other: Self) -> Result<()> {
188        if self.is_compatible_with(other) {
189            return Ok(());
190        }
191        Err(Error::Eval(format!(
192            "incompatible port rate contracts: source {} {} {:?}, target {} {} {:?}",
193            self.clock_domain.wire_label(),
194            self.latency_class.wire_label(),
195            self.nominal_rate_hz,
196            other.clock_domain.wire_label(),
197            other.latency_class.wire_label(),
198            other.nominal_rate_hz
199        )))
200    }
201}
202
203fn rates_are_compatible(left: Option<u32>, right: Option<u32>) -> bool {
204    match (left, right) {
205        (Some(left), Some(right)) => left == right,
206        _ => true,
207    }
208}
209
210/// Full identity of a stream: id, media kind, direction, clock, and buffer
211/// policy.
212///
213/// This is the value other libraries inspect to learn a stream's shape, and
214/// the value [`publish_metadata_claims`] turns into runtime facts. It
215/// round-trips to and from an [`Expr`] map (`table_expr`/`from_table_expr`)
216/// and to constructor arguments (`to_constructor_args`/`from_constructor_args`).
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub struct StreamMetadata {
219    id: Symbol,
220    media: StreamMedia,
221    direction: StreamDirection,
222    clock: Symbol,
223    buffer: BufferPolicy,
224}
225
226impl StreamMetadata {
227    /// Builds metadata from its id, media kind, direction, clock symbol, and
228    /// buffer policy.
229    pub fn new(
230        id: Symbol,
231        media: StreamMedia,
232        direction: StreamDirection,
233        clock: Symbol,
234        buffer: BufferPolicy,
235    ) -> Self {
236        Self {
237            id,
238            media,
239            direction,
240            clock,
241            buffer,
242        }
243    }
244
245    /// Returns the stream's identity symbol.
246    pub fn id(&self) -> &Symbol {
247        &self.id
248    }
249
250    /// Returns the stream's media kind.
251    pub fn media(&self) -> StreamMedia {
252        self.media
253    }
254
255    /// Returns the stream's flow direction.
256    pub fn direction(&self) -> StreamDirection {
257        self.direction
258    }
259
260    /// Returns the symbol naming the stream's clock.
261    pub fn clock(&self) -> &Symbol {
262        &self.clock
263    }
264
265    /// Returns the stream's buffer policy.
266    pub fn buffer(&self) -> &BufferPolicy {
267        &self.buffer
268    }
269
270    /// Returns the claim-store subject reference for this stream (its id as a
271    /// symbol ref).
272    pub fn subject_ref(&self) -> Ref {
273        Ref::Symbol(self.id.clone())
274    }
275
276    /// Encodes the metadata as the ordered constructor argument expressions
277    /// accepted by [`from_constructor_args`](StreamMetadata::from_constructor_args).
278    pub fn to_constructor_args(&self) -> Vec<Expr> {
279        vec![
280            Expr::Symbol(self.id.clone()),
281            Expr::Symbol(self.media.symbol()),
282            Expr::Symbol(self.direction.symbol()),
283            Expr::Symbol(self.clock.clone()),
284            self.buffer.to_expr(),
285        ]
286    }
287
288    /// Rebuilds metadata from the five constructor argument expressions
289    /// produced by [`to_constructor_args`](StreamMetadata::to_constructor_args).
290    ///
291    /// Returns an error when the argument count or any argument shape is wrong.
292    pub fn from_constructor_args(args: Vec<Expr>) -> Result<Self> {
293        let [id, media, direction, clock, buffer] = args.as_slice() else {
294            return Err(Error::Eval(
295                "stream/Metadata expects five constructor arguments".to_owned(),
296            ));
297        };
298        Ok(Self::new(
299            symbol_expr(id, "stream id")?,
300            StreamMedia::from_symbol(symbol_expr_ref(media, "stream media")?)?,
301            StreamDirection::from_symbol(symbol_expr_ref(direction, "stream direction")?)?,
302            symbol_expr(clock, "stream clock")?,
303            BufferPolicy::from_expr(buffer)?,
304        ))
305    }
306
307    /// Encodes the metadata as a self-describing `Expr` map keyed by field
308    /// name (`kind`, `id`, `media`, `direction`, `clock`, `buffer`).
309    pub fn table_expr(&self) -> Expr {
310        Expr::Map(vec![
311            (
312                Expr::Symbol(Symbol::new("kind")),
313                Expr::Symbol(stream_surface::stream_kind()),
314            ),
315            (
316                Expr::Symbol(Symbol::new("id")),
317                Expr::String(self.id.to_string()),
318            ),
319            (
320                Expr::Symbol(Symbol::new("media")),
321                Expr::Symbol(self.media.symbol()),
322            ),
323            (
324                Expr::Symbol(Symbol::new("direction")),
325                Expr::Symbol(self.direction.symbol()),
326            ),
327            (
328                Expr::Symbol(Symbol::new("clock")),
329                Expr::Symbol(self.clock.clone()),
330            ),
331            (Expr::Symbol(Symbol::new("buffer")), self.buffer.to_expr()),
332        ])
333    }
334
335    /// Rebuilds metadata from the `Expr` map produced by
336    /// [`table_expr`](StreamMetadata::table_expr).
337    ///
338    /// Returns an error when the expression is not a map or a required field
339    /// is missing or malformed.
340    pub fn from_table_expr(expr: &Expr) -> Result<Self> {
341        let Expr::Map(entries) = expr else {
342            return Err(Error::TypeMismatch {
343                expected: "stream metadata map",
344                found: expr_kind(expr),
345            });
346        };
347        Ok(Self::new(
348            Symbol::new(string_field(entries, "id")?.to_owned()),
349            StreamMedia::from_symbol(symbol_field(entries, "media")?)?,
350            StreamDirection::from_symbol(symbol_field(entries, "direction")?)?,
351            symbol_field(entries, "clock")?.clone(),
352            BufferPolicy::from_expr(field(entries, "buffer")?)?,
353        ))
354    }
355}
356
357/// Returns the `stream/id` predicate symbol used for stream-identity facts.
358pub fn stream_id_predicate() -> Symbol {
359    Symbol::qualified("stream", "id")
360}
361
362/// Returns the `stream/media` predicate symbol used for media-kind facts.
363pub fn stream_media_predicate() -> Symbol {
364    Symbol::qualified("stream", "media")
365}
366
367/// Returns the `stream/direction` predicate symbol used for direction facts.
368pub fn stream_direction_predicate() -> Symbol {
369    Symbol::qualified("stream", "direction")
370}
371
372/// Returns the `stream/buffer` predicate symbol used for buffer-policy facts.
373pub fn stream_buffer_predicate() -> Symbol {
374    Symbol::qualified("stream", "buffer")
375}
376
377/// Publishes a stream's metadata into the runtime as public facts about
378/// `subject`.
379///
380/// Writes one claim per field (id, media, direction, clock, buffer) through
381/// the kernel's stream-metadata claim surface, then records the default
382/// in-memory transport fact once. The kernel owns the claim/fact contract;
383/// this function supplies the streaming-fabric mapping from [`StreamMetadata`]
384/// onto it. The transport fact is inserted only if not already present.
385pub fn publish_metadata_claims(cx: &mut Cx, subject: Ref, metadata: &StreamMetadata) -> Result<()> {
386    publish_stream_metadata_claims(
387        cx,
388        subject.clone(),
389        [
390            (stream_id_predicate(), Ref::Symbol(metadata.id.clone())),
391            (
392                stream_media_predicate(),
393                Ref::Symbol(metadata.media.symbol()),
394            ),
395            (
396                stream_direction_predicate(),
397                Ref::Symbol(metadata.direction.symbol()),
398            ),
399            (
400                stream_surface::stream_clock_predicate(),
401                Ref::Symbol(metadata.clock.clone()),
402            ),
403            (
404                stream_buffer_predicate(),
405                Ref::Symbol(metadata.buffer.symbol()),
406            ),
407        ],
408    )?;
409    insert_once(
410        cx,
411        subject,
412        stream_surface::stream_transport_predicate(),
413        Ref::Symbol(Symbol::qualified("stream", "memory")),
414    )
415}
416
417fn insert_once(cx: &mut Cx, subject: Ref, predicate: Symbol, object: Ref) -> Result<()> {
418    let exists = !cx
419        .query_facts(ClaimPattern::exact(
420            subject.clone(),
421            predicate.clone(),
422            object.clone(),
423        ))?
424        .is_empty();
425    if !exists {
426        cx.insert_fact(Claim::public(subject, predicate, object))?;
427    }
428    Ok(())
429}
430
431fn symbol_expr(expr: &Expr, expected: &'static str) -> Result<Symbol> {
432    Ok(symbol_expr_ref(expr, expected)?.clone())
433}
434
435fn symbol_expr_ref<'a>(expr: &'a Expr, expected: &'static str) -> Result<&'a Symbol> {
436    match expr {
437        Expr::Symbol(symbol) => Ok(symbol),
438        other => Err(Error::TypeMismatch {
439            expected,
440            found: expr_kind(other),
441        }),
442    }
443}