use std::str::FromStr;
#[path = "envelope/profile.rs"]
mod profile;
#[path = "envelope/ref_codec.rs"]
mod ref_codec;
use sim_kernel::{Error, Expr, Result, Symbol, Tick};
use crate::buffer::{expr_kind, field, string_field, symbol_field};
use crate::{StreamDirection, StreamItem, StreamMedia, StreamMetadata, StreamPacket};
pub use profile::{LatencyClass, StreamCapability, TransportProfile};
use ref_codec::{ref_expr, ref_from_expr};
pub const STREAM_ENVELOPE_VERSION: u32 = 1;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ClockDomain {
Sample,
Block,
Control,
MidiTick,
Wall,
Transport,
ServerFrame,
BrowserFrame,
TraceStep,
Job,
}
impl ClockDomain {
pub fn wire_label(self) -> &'static str {
match self {
Self::Sample => "sample",
Self::Block => "block",
Self::Control => "control",
Self::MidiTick => "midi-tick",
Self::Wall => "wall",
Self::Transport => "transport",
Self::ServerFrame => "server-frame",
Self::BrowserFrame => "browser-frame",
Self::TraceStep => "trace-step",
Self::Job => "job",
}
}
pub fn symbol(self) -> Symbol {
Symbol::qualified("stream/clock-domain", self.wire_label())
}
pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
match symbol.as_qualified_str().as_str() {
"sample" | "clock/sample" | "stream/clock-domain/sample" => Ok(Self::Sample),
"block" | "clock/block" | "stream/clock-domain/block" => Ok(Self::Block),
"control" | "clock/control" | "stream/clock-domain/control" => Ok(Self::Control),
"midi"
| "midi-tick"
| "clock/midi"
| "clock/midi-tick"
| "stream/clock-domain/midi-tick" => Ok(Self::MidiTick),
"wall" | "clock/wall" | "stream/clock-domain/wall" => Ok(Self::Wall),
"transport" | "clock/transport" | "stream/clock-domain/transport" => {
Ok(Self::Transport)
}
"server-frame" | "clock/server-frame" | "stream/clock-domain/server-frame" => {
Ok(Self::ServerFrame)
}
"browser-frame" | "clock/browser-frame" | "stream/clock-domain/browser-frame" => {
Ok(Self::BrowserFrame)
}
"trace-step" | "clock/trace-step" | "stream/clock-domain/trace-step" => {
Ok(Self::TraceStep)
}
"job" | "clock/job" | "stream/clock-domain/job" => Ok(Self::Job),
other => Err(Error::Eval(format!("unknown stream clock domain {other}"))),
}
}
pub fn for_stream_clock(symbol: &Symbol) -> Self {
Self::from_symbol(symbol).unwrap_or(Self::ServerFrame)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamEnvelope {
version: u32,
stream_id: Symbol,
packet_id: Symbol,
media: StreamMedia,
direction: StreamDirection,
sequence: u64,
ticks: Vec<Tick>,
clock_domain: ClockDomain,
clock_domains: Vec<ClockDomain>,
profile: TransportProfile,
diagnostics: Vec<Symbol>,
packet: StreamPacket,
}
impl StreamEnvelope {
#[allow(clippy::too_many_arguments)]
pub fn new(
stream_id: Symbol,
packet_id: Symbol,
media: StreamMedia,
direction: StreamDirection,
sequence: u64,
ticks: Vec<Tick>,
clock_domain: ClockDomain,
profile: TransportProfile,
diagnostics: Vec<Symbol>,
packet: StreamPacket,
) -> Result<Self> {
Self::new_with_clock_domains(
stream_id,
packet_id,
media,
direction,
sequence,
ticks,
clock_domain,
vec![clock_domain],
profile,
diagnostics,
packet,
)
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_clock_domains(
stream_id: Symbol,
packet_id: Symbol,
media: StreamMedia,
direction: StreamDirection,
sequence: u64,
ticks: Vec<Tick>,
clock_domain: ClockDomain,
clock_domains: Vec<ClockDomain>,
profile: TransportProfile,
diagnostics: Vec<Symbol>,
packet: StreamPacket,
) -> Result<Self> {
sim_kernel::validate_ticks(&ticks)?;
let packet_media = packet.media();
if packet_media != media {
return Err(Error::Eval(format!(
"stream envelope media {} does not match packet media {}",
media.symbol(),
packet_media.symbol()
)));
}
let mut all_clock_domains = clock_domains;
for tick in &ticks {
all_clock_domains.push(ClockDomain::from_symbol(&tick.clock)?);
}
let clock_domains = normalize_clock_domains(clock_domain, all_clock_domains);
Ok(Self {
version: STREAM_ENVELOPE_VERSION,
stream_id,
packet_id,
media,
direction,
sequence,
ticks,
clock_domain,
clock_domains,
profile,
diagnostics,
packet,
})
}
pub fn from_item(metadata: &StreamMetadata, sequence: u64, item: &StreamItem) -> Result<Self> {
Self::from_item_with_profile(metadata, sequence, item, TransportProfile::memory_local())
}
pub fn from_item_with_profile(
metadata: &StreamMetadata,
sequence: u64,
item: &StreamItem,
profile: TransportProfile,
) -> Result<Self> {
Self::new(
metadata.id().clone(),
packet_id(metadata.id(), sequence),
metadata.media(),
metadata.direction(),
sequence,
item.ticks().to_vec(),
ClockDomain::for_stream_clock(metadata.clock()),
profile,
Vec::new(),
item.packet().clone(),
)
}
pub fn version(&self) -> u32 {
self.version
}
pub fn stream_id(&self) -> &Symbol {
&self.stream_id
}
pub fn packet_id(&self) -> &Symbol {
&self.packet_id
}
pub fn media(&self) -> StreamMedia {
self.media
}
pub fn direction(&self) -> StreamDirection {
self.direction
}
pub fn sequence(&self) -> u64 {
self.sequence
}
pub fn ticks(&self) -> &[Tick] {
&self.ticks
}
pub fn clock_domain(&self) -> ClockDomain {
self.clock_domain
}
pub fn clock_domains(&self) -> &[ClockDomain] {
&self.clock_domains
}
pub fn profile(&self) -> &TransportProfile {
&self.profile
}
pub fn diagnostics(&self) -> &[Symbol] {
&self.diagnostics
}
pub fn packet(&self) -> &StreamPacket {
&self.packet
}
pub fn to_expr(&self) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("envelope")),
Expr::Symbol(stream_envelope_tag_symbol()),
),
(
Expr::Symbol(Symbol::new("version")),
Expr::String(self.version.to_string()),
),
(
Expr::Symbol(Symbol::new("stream-id")),
Expr::Symbol(self.stream_id.clone()),
),
(
Expr::Symbol(Symbol::new("packet-id")),
Expr::Symbol(self.packet_id.clone()),
),
(
Expr::Symbol(Symbol::new("media")),
Expr::Symbol(self.media.symbol()),
),
(
Expr::Symbol(Symbol::new("direction")),
Expr::Symbol(self.direction.symbol()),
),
(
Expr::Symbol(Symbol::new("sequence")),
Expr::String(self.sequence.to_string()),
),
(
Expr::Symbol(Symbol::new("ticks")),
Expr::List(self.ticks.iter().map(tick_expr).collect()),
),
(
Expr::Symbol(Symbol::new("clock-domain")),
Expr::Symbol(self.clock_domain.symbol()),
),
(
Expr::Symbol(Symbol::new("clock-domains")),
Expr::List(
self.clock_domains
.iter()
.map(|domain| Expr::Symbol(domain.symbol()))
.collect(),
),
),
(Expr::Symbol(Symbol::new("profile")), self.profile.to_expr()),
(
Expr::Symbol(Symbol::new("diagnostics")),
Expr::List(self.diagnostics.iter().cloned().map(Expr::Symbol).collect()),
),
(Expr::Symbol(Symbol::new("packet")), self.packet.to_expr()),
])
}
}
impl TryFrom<Expr> for StreamEnvelope {
type Error = Error;
fn try_from(expr: Expr) -> Result<Self> {
let Expr::Map(entries) = &expr else {
return Err(Error::TypeMismatch {
expected: "stream envelope map",
found: expr_kind(&expr),
});
};
ensure_fields(
entries,
&[
"envelope",
"version",
"stream-id",
"packet-id",
"media",
"direction",
"sequence",
"ticks",
"clock-domain",
"clock-domains",
"profile",
"diagnostics",
"packet",
],
)?;
let tag = symbol_field(entries, "envelope")?;
if *tag != stream_envelope_tag_symbol() {
return Err(Error::Eval(format!(
"unknown stream envelope tag {}",
tag.as_qualified_str()
)));
}
let version = parse_string_field::<u32>(entries, "version")?;
if version != STREAM_ENVELOPE_VERSION {
return Err(Error::Eval(format!(
"unsupported stream envelope version {version}"
)));
}
let packet = StreamPacket::try_from(field(entries, "packet")?.clone())?;
let ticks = tick_list(entries, "ticks")?;
Self::new_with_clock_domains(
symbol_field(entries, "stream-id")?.clone(),
symbol_field(entries, "packet-id")?.clone(),
StreamMedia::from_symbol(symbol_field(entries, "media")?)?,
StreamDirection::from_symbol(symbol_field(entries, "direction")?)?,
parse_string_field::<u64>(entries, "sequence")?,
ticks,
ClockDomain::from_symbol(symbol_field(entries, "clock-domain")?)?,
clock_domain_list(entries, "clock-domains")?,
TransportProfile::from_expr(field(entries, "profile")?)?,
symbol_list(entries, "diagnostics")?.to_vec(),
packet,
)
}
}
pub fn stream_envelope_tag_symbol() -> Symbol {
Symbol::qualified("stream/envelope", "v1")
}
fn packet_id(stream_id: &Symbol, sequence: u64) -> Symbol {
Symbol::qualified(
"stream/packet-id",
format!("{}#{sequence}", stream_id.as_qualified_str()),
)
}
fn tick_expr(tick: &Tick) -> Expr {
Expr::Map(vec![
(
Expr::Symbol(Symbol::new("clock")),
Expr::Symbol(tick.clock.clone()),
),
(Expr::Symbol(Symbol::new("index")), ref_expr(&tick.index)),
])
}
fn tick_from_expr(expr: &Expr) -> Result<Tick> {
let Expr::Map(entries) = expr else {
return Err(Error::TypeMismatch {
expected: "stream tick map",
found: expr_kind(expr),
});
};
ensure_fields(entries, &["clock", "index"])?;
Ok(Tick::new(
symbol_field(entries, "clock")?.clone(),
ref_from_expr(field(entries, "index")?)?,
))
}
fn parse_string_field<T>(entries: &[(Expr, Expr)], name: &str) -> Result<T>
where
T: FromStr,
T::Err: std::fmt::Display,
{
string_field(entries, name)?
.parse::<T>()
.map_err(|err| Error::Eval(format!("invalid stream envelope {name}: {err}")))
}
fn tick_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Tick>> {
list_field(entries, name)?
.iter()
.map(tick_from_expr)
.collect()
}
fn clock_domain_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<ClockDomain>> {
symbol_list(entries, name)?
.iter()
.map(ClockDomain::from_symbol)
.collect()
}
fn symbol_list(entries: &[(Expr, Expr)], name: &str) -> Result<Vec<Symbol>> {
list_field(entries, name)?
.iter()
.map(|expr| match expr {
Expr::Symbol(symbol) => Ok(symbol.clone()),
other => Err(Error::TypeMismatch {
expected: "symbol list item",
found: expr_kind(other),
}),
})
.collect()
}
fn list_field<'a>(entries: &'a [(Expr, Expr)], name: &str) -> Result<&'a [Expr]> {
match field(entries, name)? {
Expr::List(items) => Ok(items),
other => Err(Error::TypeMismatch {
expected: "list field",
found: expr_kind(other),
}),
}
}
fn ensure_fields(entries: &[(Expr, Expr)], allowed: &[&str]) -> Result<()> {
for (key, _) in entries {
let Expr::Symbol(symbol) = key else {
return Err(Error::TypeMismatch {
expected: "symbol stream envelope field",
found: expr_kind(key),
});
};
if symbol.namespace.is_none() && allowed.contains(&symbol.name.as_ref()) {
continue;
}
return Err(Error::Eval(format!(
"unknown stream envelope field {}",
symbol.as_qualified_str()
)));
}
Ok(())
}
fn normalize_clock_domains(
primary: ClockDomain,
clock_domains: Vec<ClockDomain>,
) -> Vec<ClockDomain> {
let mut domains = vec![primary];
for domain in clock_domains {
if !domains.contains(&domain) {
domains.push(domain);
}
}
domains
}