1use std::str::FromStr;
18
19#[path = "envelope/profile.rs"]
20mod profile;
21#[path = "envelope/ref_codec.rs"]
22mod ref_codec;
23
24use sim_kernel::{Error, Expr, Result, Symbol, Tick};
25
26use crate::buffer::{expr_kind, field, string_field, symbol_field};
27use crate::{StreamDirection, StreamItem, StreamMedia, StreamMetadata, StreamPacket};
28pub use profile::{LatencyClass, StreamCapability, TransportProfile};
29use ref_codec::{ref_expr, ref_from_expr};
30
31pub const STREAM_ENVELOPE_VERSION: u32 = 1;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
57pub enum ClockDomain {
58 Sample,
60 Block,
62 Control,
64 MidiTick,
66 Wall,
68 Transport,
70 ServerFrame,
72 BrowserFrame,
74 TraceStep,
76 Job,
78}
79
80impl ClockDomain {
81 pub fn wire_label(self) -> &'static str {
83 match self {
84 Self::Sample => "sample",
85 Self::Block => "block",
86 Self::Control => "control",
87 Self::MidiTick => "midi-tick",
88 Self::Wall => "wall",
89 Self::Transport => "transport",
90 Self::ServerFrame => "server-frame",
91 Self::BrowserFrame => "browser-frame",
92 Self::TraceStep => "trace-step",
93 Self::Job => "job",
94 }
95 }
96
97 pub fn symbol(self) -> Symbol {
100 Symbol::qualified("stream/clock-domain", self.wire_label())
101 }
102
103 pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
109 match symbol.as_qualified_str().as_str() {
110 "sample" | "clock/sample" | "stream/clock-domain/sample" => Ok(Self::Sample),
111 "block" | "clock/block" | "stream/clock-domain/block" => Ok(Self::Block),
112 "control" | "clock/control" | "stream/clock-domain/control" => Ok(Self::Control),
113 "midi"
114 | "midi-tick"
115 | "clock/midi"
116 | "clock/midi-tick"
117 | "stream/clock-domain/midi-tick" => Ok(Self::MidiTick),
118 "wall" | "clock/wall" | "stream/clock-domain/wall" => Ok(Self::Wall),
119 "transport" | "clock/transport" | "stream/clock-domain/transport" => {
120 Ok(Self::Transport)
121 }
122 "server-frame" | "clock/server-frame" | "stream/clock-domain/server-frame" => {
123 Ok(Self::ServerFrame)
124 }
125 "browser-frame" | "clock/browser-frame" | "stream/clock-domain/browser-frame" => {
126 Ok(Self::BrowserFrame)
127 }
128 "trace-step" | "clock/trace-step" | "stream/clock-domain/trace-step" => {
129 Ok(Self::TraceStep)
130 }
131 "job" | "clock/job" | "stream/clock-domain/job" => Ok(Self::Job),
132 other => Err(Error::Eval(format!("unknown stream clock domain {other}"))),
133 }
134 }
135
136 pub fn for_stream_clock(symbol: &Symbol) -> Self {
139 Self::from_symbol(symbol).unwrap_or(Self::ServerFrame)
140 }
141}
142
143#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct StreamEnvelope {
154 version: u32,
155 stream_id: Symbol,
156 packet_id: Symbol,
157 media: StreamMedia,
158 direction: StreamDirection,
159 sequence: u64,
160 ticks: Vec<Tick>,
161 clock_domain: ClockDomain,
162 clock_domains: Vec<ClockDomain>,
163 profile: TransportProfile,
164 diagnostics: Vec<Symbol>,
165 packet: StreamPacket,
166}
167
168impl StreamEnvelope {
169 #[allow(clippy::too_many_arguments)]
175 pub fn new(
176 stream_id: Symbol,
177 packet_id: Symbol,
178 media: StreamMedia,
179 direction: StreamDirection,
180 sequence: u64,
181 ticks: Vec<Tick>,
182 clock_domain: ClockDomain,
183 profile: TransportProfile,
184 diagnostics: Vec<Symbol>,
185 packet: StreamPacket,
186 ) -> Result<Self> {
187 Self::new_with_clock_domains(
188 stream_id,
189 packet_id,
190 media,
191 direction,
192 sequence,
193 ticks,
194 clock_domain,
195 vec![clock_domain],
196 profile,
197 diagnostics,
198 packet,
199 )
200 }
201
202 #[allow(clippy::too_many_arguments)]
210 pub fn new_with_clock_domains(
211 stream_id: Symbol,
212 packet_id: Symbol,
213 media: StreamMedia,
214 direction: StreamDirection,
215 sequence: u64,
216 ticks: Vec<Tick>,
217 clock_domain: ClockDomain,
218 clock_domains: Vec<ClockDomain>,
219 profile: TransportProfile,
220 diagnostics: Vec<Symbol>,
221 packet: StreamPacket,
222 ) -> Result<Self> {
223 sim_kernel::validate_ticks(&ticks)?;
224 let packet_media = packet.media();
225 if packet_media != media {
226 return Err(Error::Eval(format!(
227 "stream envelope media {} does not match packet media {}",
228 media.symbol(),
229 packet_media.symbol()
230 )));
231 }
232 let mut all_clock_domains = clock_domains;
233 for tick in &ticks {
234 all_clock_domains.push(ClockDomain::from_symbol(&tick.clock)?);
235 }
236 let clock_domains = normalize_clock_domains(clock_domain, all_clock_domains);
237 Ok(Self {
238 version: STREAM_ENVELOPE_VERSION,
239 stream_id,
240 packet_id,
241 media,
242 direction,
243 sequence,
244 ticks,
245 clock_domain,
246 clock_domains,
247 profile,
248 diagnostics,
249 packet,
250 })
251 }
252
253 pub fn from_item(metadata: &StreamMetadata, sequence: u64, item: &StreamItem) -> Result<Self> {
260 Self::from_item_with_profile(metadata, sequence, item, TransportProfile::memory_local())
261 }
262
263 pub fn from_item_with_profile(
271 metadata: &StreamMetadata,
272 sequence: u64,
273 item: &StreamItem,
274 profile: TransportProfile,
275 ) -> Result<Self> {
276 Self::new(
277 metadata.id().clone(),
278 packet_id(metadata.id(), sequence),
279 metadata.media(),
280 metadata.direction(),
281 sequence,
282 item.ticks().to_vec(),
283 ClockDomain::for_stream_clock(metadata.clock()),
284 profile,
285 Vec::new(),
286 item.packet().clone(),
287 )
288 }
289
290 pub fn version(&self) -> u32 {
292 self.version
293 }
294
295 pub fn stream_id(&self) -> &Symbol {
297 &self.stream_id
298 }
299
300 pub fn packet_id(&self) -> &Symbol {
302 &self.packet_id
303 }
304
305 pub fn media(&self) -> StreamMedia {
307 self.media
308 }
309
310 pub fn direction(&self) -> StreamDirection {
312 self.direction
313 }
314
315 pub fn sequence(&self) -> u64 {
317 self.sequence
318 }
319
320 pub fn ticks(&self) -> &[Tick] {
322 &self.ticks
323 }
324
325 pub fn clock_domain(&self) -> ClockDomain {
327 self.clock_domain
328 }
329
330 pub fn clock_domains(&self) -> &[ClockDomain] {
336 &self.clock_domains
337 }
338
339 pub fn profile(&self) -> &TransportProfile {
342 &self.profile
343 }
344
345 pub fn diagnostics(&self) -> &[Symbol] {
347 &self.diagnostics
348 }
349
350 pub fn packet(&self) -> &StreamPacket {
352 &self.packet
353 }
354
355 pub fn to_expr(&self) -> Expr {
360 Expr::Map(vec![
361 (
362 Expr::Symbol(Symbol::new("envelope")),
363 Expr::Symbol(stream_envelope_tag_symbol()),
364 ),
365 (
366 Expr::Symbol(Symbol::new("version")),
367 Expr::String(self.version.to_string()),
368 ),
369 (
370 Expr::Symbol(Symbol::new("stream-id")),
371 Expr::Symbol(self.stream_id.clone()),
372 ),
373 (
374 Expr::Symbol(Symbol::new("packet-id")),
375 Expr::Symbol(self.packet_id.clone()),
376 ),
377 (
378 Expr::Symbol(Symbol::new("media")),
379 Expr::Symbol(self.media.symbol()),
380 ),
381 (
382 Expr::Symbol(Symbol::new("direction")),
383 Expr::Symbol(self.direction.symbol()),
384 ),
385 (
386 Expr::Symbol(Symbol::new("sequence")),
387 Expr::String(self.sequence.to_string()),
388 ),
389 (
390 Expr::Symbol(Symbol::new("ticks")),
391 Expr::List(self.ticks.iter().map(tick_expr).collect()),
392 ),
393 (
394 Expr::Symbol(Symbol::new("clock-domain")),
395 Expr::Symbol(self.clock_domain.symbol()),
396 ),
397 (
398 Expr::Symbol(Symbol::new("clock-domains")),
399 Expr::List(
400 self.clock_domains
401 .iter()
402 .map(|domain| Expr::Symbol(domain.symbol()))
403 .collect(),
404 ),
405 ),
406 (Expr::Symbol(Symbol::new("profile")), self.profile.to_expr()),
407 (
408 Expr::Symbol(Symbol::new("diagnostics")),
409 Expr::List(self.diagnostics.iter().cloned().map(Expr::Symbol).collect()),
410 ),
411 (Expr::Symbol(Symbol::new("packet")), self.packet.to_expr()),
412 ])
413 }
414}
415
416impl TryFrom<Expr> for StreamEnvelope {
417 type Error = Error;
418
419 fn try_from(expr: Expr) -> Result<Self> {
420 let Expr::Map(entries) = &expr else {
421 return Err(Error::TypeMismatch {
422 expected: "stream envelope map",
423 found: expr_kind(&expr),
424 });
425 };
426 ensure_fields(
427 entries,
428 &[
429 "envelope",
430 "version",
431 "stream-id",
432 "packet-id",
433 "media",
434 "direction",
435 "sequence",
436 "ticks",
437 "clock-domain",
438 "clock-domains",
439 "profile",
440 "diagnostics",
441 "packet",
442 ],
443 )?;
444 let tag = symbol_field(entries, "envelope")?;
445 if *tag != stream_envelope_tag_symbol() {
446 return Err(Error::Eval(format!(
447 "unknown stream envelope tag {}",
448 tag.as_qualified_str()
449 )));
450 }
451 let version = parse_string_field::<u32>(entries, "version")?;
452 if version != STREAM_ENVELOPE_VERSION {
453 return Err(Error::Eval(format!(
454 "unsupported stream envelope version {version}"
455 )));
456 }
457 let packet = StreamPacket::try_from(field(entries, "packet")?.clone())?;
458 let ticks = tick_list(entries, "ticks")?;
459 Self::new_with_clock_domains(
460 symbol_field(entries, "stream-id")?.clone(),
461 symbol_field(entries, "packet-id")?.clone(),
462 StreamMedia::from_symbol(symbol_field(entries, "media")?)?,
463 StreamDirection::from_symbol(symbol_field(entries, "direction")?)?,
464 parse_string_field::<u64>(entries, "sequence")?,
465 ticks,
466 ClockDomain::from_symbol(symbol_field(entries, "clock-domain")?)?,
467 clock_domain_list(entries, "clock-domains")?,
468 TransportProfile::from_expr(field(entries, "profile")?)?,
469 symbol_list(entries, "diagnostics")?.to_vec(),
470 packet,
471 )
472 }
473}
474
475pub fn stream_envelope_tag_symbol() -> Symbol {
480 Symbol::qualified("stream/envelope", "v1")
481}
482
483fn packet_id(stream_id: &Symbol, sequence: u64) -> Symbol {
484 Symbol::qualified(
485 "stream/packet-id",
486 format!("{}#{sequence}", stream_id.as_qualified_str()),
487 )
488}
489
490fn tick_expr(tick: &Tick) -> Expr {
491 Expr::Map(vec![
492 (
493 Expr::Symbol(Symbol::new("clock")),
494 Expr::Symbol(tick.clock.clone()),
495 ),
496 (Expr::Symbol(Symbol::new("index")), ref_expr(&tick.index)),
497 ])
498}
499
500fn tick_from_expr(expr: &Expr) -> Result<Tick> {
501 let Expr::Map(entries) = expr else {
502 return Err(Error::TypeMismatch {
503 expected: "stream tick map",
504 found: expr_kind(expr),
505 });
506 };
507 ensure_fields(entries, &["clock", "index"])?;
508 Ok(Tick::new(
509 symbol_field(entries, "clock")?.clone(),
510 ref_from_expr(field(entries, "index")?)?,
511 ))
512}
513
514fn parse_string_field<T>(entries: &[(Expr, Expr)], name: &str) -> Result<T>
515where
516 T: FromStr,
517 T::Err: std::fmt::Display,
518{
519 string_field(entries, name)?
520 .parse::<T>()
521 .map_err(|err| Error::Eval(format!("invalid stream envelope {name}: {err}")))
522}
523
524fn tick_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Tick>> {
525 list_field(entries, name)?
526 .iter()
527 .map(tick_from_expr)
528 .collect()
529}
530
531fn clock_domain_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<ClockDomain>> {
532 symbol_list(entries, name)?
533 .iter()
534 .map(ClockDomain::from_symbol)
535 .collect()
536}
537
538fn symbol_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Symbol>> {
539 list_field(entries, name)?
540 .iter()
541 .map(|expr| match expr {
542 Expr::Symbol(symbol) => Ok(symbol.clone()),
543 other => Err(Error::TypeMismatch {
544 expected: "symbol list item",
545 found: expr_kind(other),
546 }),
547 })
548 .collect()
549}
550
551fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
552 match field(entries, name)? {
553 Expr::List(items) => Ok(items),
554 other => Err(Error::TypeMismatch {
555 expected: "list field",
556 found: expr_kind(other),
557 }),
558 }
559}
560
561fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
562 for (key, _) in entries {
563 let Expr::Symbol(symbol) = key else {
564 return Err(Error::TypeMismatch {
565 expected: "symbol stream envelope field",
566 found: expr_kind(key),
567 });
568 };
569 if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
570 continue;
571 }
572 return Err(Error::Eval(format!(
573 "unknown stream envelope field {}",
574 symbol.as_qualified_str()
575 )));
576 }
577 Ok(())
578}
579
580fn normalize_clock_domains(
581 primary: ClockDomain,
582 clock_domains: Vec<ClockDomain>,
583) -> Vec<ClockDomain> {
584 let mut domains = vec![primary];
585 for domain in clock_domains {
586 if !domains.contains(&domain) {
587 domains.push(domain);
588 }
589 }
590 domains
591}