Skip to main content

sim_lib_stream_core/
packet.rs

1//! Packet payloads carried by stream envelopes.
2//!
3//! [`StreamPacket`] is the umbrella over the concrete payload kinds an
4//! envelope can hold: [`PcmPacket`] audio frames, [`MidiPacket`] events,
5//! [`StreamDiagnostic`] messages, and opaque [`DataPacket`] values. Each
6//! payload round-trips to and from a self-describing [`Expr`] map tagged with
7//! a `stream/packet/*` symbol, so packets can be serialized, interned as
8//! kernel data ([`StreamPacket::intern_ref`]), and reconstructed.
9//!
10//! The kernel defines the `Expr`/`Datum`/datum-store contract; this module
11//! supplies the concrete streaming-fabric payload model on top of it.
12
13use std::fmt::Display;
14use std::str::FromStr;
15
16use sim_kernel::{Cx, Datum, DatumStore, Error, Expr, Ref, Result, Symbol};
17
18use crate::buffer::{expr_kind, field, string_field, symbol_field};
19use crate::metadata::StreamMedia;
20
21#[path = "packet/pcm.rs"]
22mod pcm;
23
24pub use pcm::{PcmPacket, PcmSampleFormat};
25
26/// A single timed MIDI event within a [`MidiPacket`].
27///
28/// Holds the event time in ticks, the ticks-per-quarter-note (TPQ) resolution
29/// the ticks are measured against, and the raw MIDI message bytes.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct MidiPacketEvent {
32    ticks: i64,
33    tpq: u16,
34    bytes: Vec<u8>,
35}
36
37impl MidiPacketEvent {
38    /// Builds an event from its tick time, TPQ resolution, and message bytes.
39    ///
40    /// Returns an error when `tpq` is zero, since a zero resolution cannot
41    /// time the event.
42    pub fn new(ticks: i64, tpq: u16, bytes: Vec<u8>) -> Result<Self> {
43        if tpq == 0 {
44            return Err(Error::Eval(
45                "MIDI packet TPQ must be greater than zero".to_owned(),
46            ));
47        }
48        Ok(Self { ticks, tpq, bytes })
49    }
50
51    /// Returns the event time in ticks.
52    pub fn ticks(&self) -> i64 {
53        self.ticks
54    }
55
56    /// Returns the ticks-per-quarter-note resolution the ticks are measured in.
57    pub fn tpq(&self) -> u16 {
58        self.tpq
59    }
60
61    /// Returns the raw MIDI message bytes.
62    pub fn bytes(&self) -> &[u8] {
63        &self.bytes
64    }
65}
66
67/// A MIDI payload: an ordered run of [`MidiPacketEvent`]s sharing one TPQ
68/// resolution.
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub struct MidiPacket {
71    tpq: u16,
72    events: Vec<MidiPacketEvent>,
73}
74
75impl MidiPacket {
76    /// Builds a packet from its events, adopting the TPQ of the first event.
77    ///
78    /// Returns an error when `events` is empty or any event uses a different
79    /// TPQ than the first; a packet carries a single shared resolution.
80    pub fn new(events: Vec<MidiPacketEvent>) -> Result<Self> {
81        let Some(first) = events.first() else {
82            return Err(Error::Eval(
83                "MIDI packet must contain at least one event".to_owned(),
84            ));
85        };
86        let tpq = first.tpq;
87        if events.iter().any(|event| event.tpq != tpq) {
88            return Err(Error::Eval(
89                "MIDI packet events must use one shared TPQ".to_owned(),
90            ));
91        }
92        Ok(Self { tpq, events })
93    }
94
95    /// Returns the shared ticks-per-quarter-note resolution of every event.
96    pub fn tpq(&self) -> u16 {
97        self.tpq
98    }
99
100    /// Returns the packet's events in order.
101    pub fn events(&self) -> &[MidiPacketEvent] {
102        &self.events
103    }
104
105    /// Encodes the packet as a `stream/packet/midi` [`Expr`] map.
106    pub fn to_expr(&self) -> Expr {
107        Expr::Map(vec![
108            (
109                Expr::Symbol(Symbol::new("packet")),
110                Expr::Symbol(Symbol::qualified("stream/packet", "midi")),
111            ),
112            (
113                Expr::Symbol(Symbol::new("tpq")),
114                Expr::String(self.tpq.to_string()),
115            ),
116            (
117                Expr::Symbol(Symbol::new("events")),
118                Expr::List(self.events.iter().map(midi_event_expr).collect()),
119            ),
120        ])
121    }
122}
123
124/// A diagnostic payload: a categorized human-readable message carried
125/// in-band on a stream.
126#[derive(Clone, Debug, PartialEq, Eq)]
127pub struct StreamDiagnostic {
128    kind: Symbol,
129    message: String,
130}
131
132impl StreamDiagnostic {
133    /// Builds a diagnostic from its kind symbol and message text.
134    pub fn new(kind: Symbol, message: impl Into<String>) -> Self {
135        Self {
136            kind,
137            message: message.into(),
138        }
139    }
140
141    /// Returns the symbol categorizing this diagnostic.
142    pub fn kind(&self) -> &Symbol {
143        &self.kind
144    }
145
146    /// Returns the diagnostic message text.
147    pub fn message(&self) -> &str {
148        &self.message
149    }
150
151    /// Encodes the diagnostic as a `stream/packet/diagnostic` [`Expr`] map.
152    pub fn to_expr(&self) -> Expr {
153        Expr::Map(vec![
154            (
155                Expr::Symbol(Symbol::new("packet")),
156                Expr::Symbol(Symbol::qualified("stream/packet", "diagnostic")),
157            ),
158            (
159                Expr::Symbol(Symbol::new("kind")),
160                Expr::Symbol(self.kind.clone()),
161            ),
162            (
163                Expr::Symbol(Symbol::new("message")),
164                Expr::String(self.message.clone()),
165            ),
166        ])
167    }
168}
169
170/// An opaque structured payload: a kind-tagged arbitrary [`Expr`] value.
171///
172/// Used for application-defined traffic the fabric does not interpret, such as
173/// model events and rank frontiers (see [`StreamPacket::model_event`] and
174/// [`StreamPacket::rank_frontier`]).
175#[derive(Clone, Debug, PartialEq, Eq)]
176pub struct DataPacket {
177    /// Symbol categorizing the payload (for example `stream/data/model-event`).
178    pub kind: Symbol,
179    /// The application-defined payload expression.
180    pub payload: Expr,
181}
182
183impl DataPacket {
184    /// Builds a data packet from its kind symbol and payload expression.
185    pub fn new(kind: Symbol, payload: Expr) -> Self {
186        Self { kind, payload }
187    }
188
189    /// Encodes the packet as a `stream/packet/data` [`Expr`] map.
190    pub fn to_expr(&self) -> Expr {
191        Expr::Map(vec![
192            (
193                Expr::Symbol(Symbol::new("packet")),
194                Expr::Symbol(Symbol::qualified("stream/packet", "data")),
195            ),
196            (
197                Expr::Symbol(Symbol::new("kind")),
198                Expr::Symbol(self.kind.clone()),
199            ),
200            (Expr::Symbol(Symbol::new("payload")), self.payload.clone()),
201        ])
202    }
203}
204
205/// Umbrella over every payload kind a stream envelope can carry.
206///
207/// Each variant maps to a [`StreamMedia`] kind and to a `stream/packet/*`
208/// tagged [`Expr`] map. [`TryFrom<Expr>`](StreamPacket#impl-TryFrom<Expr>-for-StreamPacket)
209/// reconstructs a packet from that encoding.
210#[derive(Clone, Debug, PartialEq, Eq)]
211pub enum StreamPacket {
212    /// Real-time PCM audio frames.
213    Pcm(PcmPacket),
214    /// Timed MIDI events.
215    Midi(MidiPacket),
216    /// In-band diagnostic message.
217    Diagnostic(StreamDiagnostic),
218    /// Opaque application-defined data.
219    Data(DataPacket),
220}
221
222impl StreamPacket {
223    /// Returns the [`StreamMedia`] kind this payload belongs to.
224    pub fn media(&self) -> StreamMedia {
225        match self {
226            Self::Pcm(_) => StreamMedia::Pcm,
227            Self::Midi(_) => StreamMedia::Midi,
228            Self::Diagnostic(_) => StreamMedia::Diagnostic,
229            Self::Data(_) => StreamMedia::Data,
230        }
231    }
232
233    /// Builds a [`StreamPacket::Data`] payload from a kind symbol and payload
234    /// expression.
235    pub fn data(kind: Symbol, payload: Expr) -> Self {
236        Self::Data(DataPacket::new(kind, payload))
237    }
238
239    /// Builds a `stream/data/model-event` data packet around `payload`.
240    pub fn model_event(payload: Expr) -> Self {
241        Self::data(Symbol::qualified("stream/data", "model-event"), payload)
242    }
243
244    /// Builds a `stream/data/rank-frontier` data packet around `payload`.
245    pub fn rank_frontier(payload: Expr) -> Self {
246        Self::data(Symbol::qualified("stream/data", "rank-frontier"), payload)
247    }
248
249    /// Encodes the packet as its variant's `stream/packet/*` [`Expr`] map.
250    pub fn to_expr(&self) -> Expr {
251        match self {
252            Self::Pcm(packet) => packet.to_expr(),
253            Self::Midi(packet) => packet.to_expr(),
254            Self::Diagnostic(packet) => packet.to_expr(),
255            Self::Data(packet) => packet.to_expr(),
256        }
257    }
258
259    /// Interns the packet's encoded form into the runtime datum store and
260    /// returns a content [`Ref`] to it.
261    ///
262    /// The kernel owns the datum store and content-addressing; this turns the
263    /// packet's [`Expr`] encoding into an interned [`Datum`].
264    pub fn intern_ref(&self, cx: &mut Cx) -> Result<Ref> {
265        let datum = Datum::try_from(self.to_expr())?;
266        cx.datum_store_mut().intern(datum).map(Ref::Content)
267    }
268}
269
270impl TryFrom<Expr> for StreamPacket {
271    type Error = Error;
272
273    fn try_from(expr: Expr) -> Result<Self> {
274        let Expr::Map(entries) = &expr else {
275            return Err(Error::TypeMismatch {
276                expected: "stream packet map",
277                found: expr_kind(&expr),
278            });
279        };
280        let packet = packet_symbol(entries)?;
281        match packet.as_qualified_str().as_str() {
282            "stream/packet/pcm" => PcmPacket::from_entries(entries).map(Self::Pcm),
283            "stream/packet/midi" => MidiPacket::from_entries(entries).map(Self::Midi),
284            "stream/packet/diagnostic" => {
285                StreamDiagnostic::from_entries(entries).map(Self::Diagnostic)
286            }
287            "stream/packet/data" => DataPacket::from_entries(entries).map(Self::Data),
288            other => Err(Error::Eval(format!("unknown stream packet kind {other}"))),
289        }
290    }
291}
292
293impl MidiPacket {
294    fn from_entries(entries: &[(Expr, Expr)]) -> Result<Self> {
295        let tpq = parse_string_field::<u16>(entries, "tpq")?;
296        let events = list_field(entries, "events")?
297            .iter()
298            .enumerate()
299            .map(|(index, expr)| {
300                let event = MidiPacketEvent::from_expr(expr)?;
301                if event.tpq() != tpq {
302                    return Err(Error::Eval(format!(
303                        "MIDI packet event {index} TPQ {} does not match packet TPQ {tpq}",
304                        event.tpq()
305                    )));
306                }
307                Ok(event)
308            })
309            .collect::<Result<Vec<_>>>()?;
310        Self::new(events)
311    }
312}
313
314impl MidiPacketEvent {
315    fn from_expr(expr: &Expr) -> Result<Self> {
316        let Expr::Map(entries) = expr else {
317            return Err(Error::TypeMismatch {
318                expected: "MIDI packet event map",
319                found: expr_kind(expr),
320            });
321        };
322        let ticks = parse_string_field::<i64>(entries, "ticks")?;
323        let tpq = parse_string_field::<u16>(entries, "tpq")?;
324        let bytes = bytes_field(entries, "bytes")?.to_vec();
325        Self::new(ticks, tpq, bytes)
326    }
327}
328
329impl StreamDiagnostic {
330    fn from_entries(entries: &[(Expr, Expr)]) -> Result<Self> {
331        Ok(Self::new(
332            symbol_field(entries, "kind")?.clone(),
333            string_field(entries, "message")?.to_owned(),
334        ))
335    }
336}
337
338impl DataPacket {
339    fn from_entries(entries: &[(Expr, Expr)]) -> Result<Self> {
340        ensure_data_fields_closed(entries)?;
341        Ok(Self::new(
342            symbol_field(entries, "kind")?.clone(),
343            field(entries, "payload")?.clone(),
344        ))
345    }
346}
347
348fn packet_symbol(entries: &[(Expr, Expr)]) -> Result<&Symbol> {
349    entries
350        .iter()
351        .find_map(|(key, value)| match (key, value) {
352            (Expr::Symbol(key), Expr::Symbol(value)) if key.name.as_ref() == "packet" => {
353                Some(value)
354            }
355            _ => None,
356        })
357        .ok_or_else(|| Error::Eval("stream packet missing packet symbol".to_owned()))
358}
359
360fn ensure_data_fields_closed(entries: &[(Expr, Expr)]) -> Result<()> {
361    for (key, _) in entries {
362        let Expr::Symbol(symbol) = key else {
363            return Err(Error::TypeMismatch {
364                expected: "symbol data packet field",
365                found: expr_kind(key),
366            });
367        };
368        if symbol.namespace.is_none()
369            && matches!(symbol.name.as_ref(), "packet" | "kind" | "payload")
370        {
371            continue;
372        }
373        return Err(Error::Eval(format!(
374            "unknown data packet field {}",
375            symbol.as_qualified_str()
376        )));
377    }
378    Ok(())
379}
380
381pub(super) fn parse_string_field<T>(entries: &[(Expr, Expr)], name: &str) -> Result<T>
382where
383    T: FromStr,
384    T::Err: Display,
385{
386    string_field(entries, name)?
387        .parse::<T>()
388        .map_err(|err| Error::Eval(format!("invalid stream packet {name}: {err}")))
389}
390
391pub(super) fn parse_string_expr<T>(expr: &Expr, expected: &'static str) -> Result<T>
392where
393    T: FromStr,
394    T::Err: Display,
395{
396    match expr {
397        Expr::String(value) => value
398            .parse::<T>()
399            .map_err(|err| Error::Eval(format!("{expected} parse failed: {err}"))),
400        other => Err(Error::TypeMismatch {
401            expected,
402            found: expr_kind(other),
403        }),
404    }
405}
406
407pub(super) fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
408    match field(entries, name)? {
409        Expr::List(items) => Ok(items),
410        other => Err(Error::TypeMismatch {
411            expected: "list field",
412            found: expr_kind(other),
413        }),
414    }
415}
416
417fn bytes_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [u8]> {
418    match field(entries, name)? {
419        Expr::Bytes(bytes) => Ok(bytes),
420        other => Err(Error::TypeMismatch {
421            expected: "bytes field",
422            found: expr_kind(other),
423        }),
424    }
425}
426
427fn midi_event_expr(event: &MidiPacketEvent) -> Expr {
428    Expr::Map(vec![
429        (
430            Expr::Symbol(Symbol::new("ticks")),
431            Expr::String(event.ticks.to_string()),
432        ),
433        (
434            Expr::Symbol(Symbol::new("tpq")),
435            Expr::String(event.tpq.to_string()),
436        ),
437        (
438            Expr::Symbol(Symbol::new("bytes")),
439            Expr::Bytes(event.bytes.clone()),
440        ),
441    ])
442}