1use sim_kernel::{
16 Claim, ClaimPattern, Cx, Error, Expr, Ref, Result, Symbol, stream_surface,
17 stream_surface::publish_stream_metadata_claims,
18};
19
20use crate::buffer::{BufferPolicy, expr_kind, field, string_field, symbol_field};
21use crate::{ClockDomain, LatencyClass};
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum StreamMedia {
30 Pcm,
32 Midi,
34 Diagnostic,
36 Data,
38}
39
40impl StreamMedia {
41 pub fn symbol(self) -> Symbol {
43 match self {
44 Self::Pcm => Symbol::qualified("stream/media", "pcm"),
45 Self::Midi => Symbol::qualified("stream/media", "midi"),
46 Self::Diagnostic => Symbol::qualified("stream/media", "diagnostic"),
47 Self::Data => Symbol::qualified("stream/media", "data"),
48 }
49 }
50
51 pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
55 match symbol.as_qualified_str().as_str() {
56 "stream/media/pcm" => Ok(Self::Pcm),
57 "stream/media/midi" => Ok(Self::Midi),
58 "stream/media/diagnostic" => Ok(Self::Diagnostic),
59 "stream/media/data" => Ok(Self::Data),
60 other => Err(Error::Eval(format!("unknown stream media {other}"))),
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum StreamDirection {
68 Source,
70 Sink,
72 Duplex,
74}
75
76impl StreamDirection {
77 pub fn symbol(self) -> Symbol {
79 match self {
80 Self::Source => Symbol::qualified("stream/direction", "source"),
81 Self::Sink => Symbol::qualified("stream/direction", "sink"),
82 Self::Duplex => Symbol::qualified("stream/direction", "duplex"),
83 }
84 }
85
86 pub fn from_symbol(symbol: &Symbol) -> Result<Self> {
90 match symbol.as_qualified_str().as_str() {
91 "stream/direction/source" => Ok(Self::Source),
92 "stream/direction/sink" => Ok(Self::Sink),
93 "stream/direction/duplex" => Ok(Self::Duplex),
94 other => Err(Error::Eval(format!("unknown stream direction {other}"))),
95 }
96 }
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
105pub struct RateContract {
106 clock_domain: ClockDomain,
107 latency_class: LatencyClass,
108 nominal_rate_hz: Option<u32>,
109}
110
111impl RateContract {
112 pub fn new(
115 clock_domain: ClockDomain,
116 latency_class: LatencyClass,
117 nominal_rate_hz: Option<u32>,
118 ) -> Self {
119 Self {
120 clock_domain,
121 latency_class,
122 nominal_rate_hz,
123 }
124 }
125
126 pub fn sample_exact(nominal_rate_hz: Option<u32>) -> Self {
129 Self::new(
130 ClockDomain::Sample,
131 LatencyClass::SampleExact,
132 nominal_rate_hz,
133 )
134 }
135
136 pub fn block_local() -> Self {
139 Self::new(ClockDomain::Block, LatencyClass::BlockLocal, None)
140 }
141
142 pub fn control() -> Self {
145 Self::new(ClockDomain::Control, LatencyClass::Interactive, None)
146 }
147
148 pub fn midi_tick() -> Self {
151 Self::new(ClockDomain::MidiTick, LatencyClass::Interactive, None)
152 }
153
154 pub fn trace_step() -> Self {
157 Self::new(ClockDomain::TraceStep, LatencyClass::OfflineRender, None)
158 }
159
160 pub fn clock_domain(self) -> ClockDomain {
162 self.clock_domain
163 }
164
165 pub fn latency_class(self) -> LatencyClass {
167 self.latency_class
168 }
169
170 pub fn nominal_rate_hz(self) -> Option<u32> {
172 self.nominal_rate_hz
173 }
174
175 pub fn is_compatible_with(self, other: Self) -> bool {
180 self.clock_domain == other.clock_domain
181 && self.latency_class == other.latency_class
182 && rates_are_compatible(self.nominal_rate_hz, other.nominal_rate_hz)
183 }
184
185 pub fn ensure_compatible(self, other: Self) -> Result<()> {
188 if self.is_compatible_with(other) {
189 return Ok(());
190 }
191 Err(Error::Eval(format!(
192 "incompatible port rate contracts: source {} {} {:?}, target {} {} {:?}",
193 self.clock_domain.wire_label(),
194 self.latency_class.wire_label(),
195 self.nominal_rate_hz,
196 other.clock_domain.wire_label(),
197 other.latency_class.wire_label(),
198 other.nominal_rate_hz
199 )))
200 }
201}
202
203fn rates_are_compatible(left: Option<u32>, right: Option<u32>) -> bool {
204 match (left, right) {
205 (Some(left), Some(right)) => left == right,
206 _ => true,
207 }
208}
209
210#[derive(Clone, Debug, PartialEq, Eq)]
218pub struct StreamMetadata {
219 id: Symbol,
220 media: StreamMedia,
221 direction: StreamDirection,
222 clock: Symbol,
223 buffer: BufferPolicy,
224}
225
226impl StreamMetadata {
227 pub fn new(
230 id: Symbol,
231 media: StreamMedia,
232 direction: StreamDirection,
233 clock: Symbol,
234 buffer: BufferPolicy,
235 ) -> Self {
236 Self {
237 id,
238 media,
239 direction,
240 clock,
241 buffer,
242 }
243 }
244
245 pub fn id(&self) -> &Symbol {
247 &self.id
248 }
249
250 pub fn media(&self) -> StreamMedia {
252 self.media
253 }
254
255 pub fn direction(&self) -> StreamDirection {
257 self.direction
258 }
259
260 pub fn clock(&self) -> &Symbol {
262 &self.clock
263 }
264
265 pub fn buffer(&self) -> &BufferPolicy {
267 &self.buffer
268 }
269
270 pub fn subject_ref(&self) -> Ref {
273 Ref::Symbol(self.id.clone())
274 }
275
276 pub fn to_constructor_args(&self) -> Vec<Expr> {
279 vec![
280 Expr::Symbol(self.id.clone()),
281 Expr::Symbol(self.media.symbol()),
282 Expr::Symbol(self.direction.symbol()),
283 Expr::Symbol(self.clock.clone()),
284 self.buffer.to_expr(),
285 ]
286 }
287
288 pub fn from_constructor_args(args: Vec<Expr>) -> Result<Self> {
293 let [id, media, direction, clock, buffer] = args.as_slice() else {
294 return Err(Error::Eval(
295 "stream/Metadata expects five constructor arguments".to_owned(),
296 ));
297 };
298 Ok(Self::new(
299 symbol_expr(id, "stream id")?,
300 StreamMedia::from_symbol(symbol_expr_ref(media, "stream media")?)?,
301 StreamDirection::from_symbol(symbol_expr_ref(direction, "stream direction")?)?,
302 symbol_expr(clock, "stream clock")?,
303 BufferPolicy::from_expr(buffer)?,
304 ))
305 }
306
307 pub fn table_expr(&self) -> Expr {
310 Expr::Map(vec![
311 (
312 Expr::Symbol(Symbol::new("kind")),
313 Expr::Symbol(stream_surface::stream_kind()),
314 ),
315 (
316 Expr::Symbol(Symbol::new("id")),
317 Expr::String(self.id.to_string()),
318 ),
319 (
320 Expr::Symbol(Symbol::new("media")),
321 Expr::Symbol(self.media.symbol()),
322 ),
323 (
324 Expr::Symbol(Symbol::new("direction")),
325 Expr::Symbol(self.direction.symbol()),
326 ),
327 (
328 Expr::Symbol(Symbol::new("clock")),
329 Expr::Symbol(self.clock.clone()),
330 ),
331 (Expr::Symbol(Symbol::new("buffer")), self.buffer.to_expr()),
332 ])
333 }
334
335 pub fn from_table_expr(expr: &Expr) -> Result<Self> {
341 let Expr::Map(entries) = expr else {
342 return Err(Error::TypeMismatch {
343 expected: "stream metadata map",
344 found: expr_kind(expr),
345 });
346 };
347 Ok(Self::new(
348 Symbol::new(string_field(entries, "id")?.to_owned()),
349 StreamMedia::from_symbol(symbol_field(entries, "media")?)?,
350 StreamDirection::from_symbol(symbol_field(entries, "direction")?)?,
351 symbol_field(entries, "clock")?.clone(),
352 BufferPolicy::from_expr(field(entries, "buffer")?)?,
353 ))
354 }
355}
356
357pub fn stream_id_predicate() -> Symbol {
359 Symbol::qualified("stream", "id")
360}
361
362pub fn stream_media_predicate() -> Symbol {
364 Symbol::qualified("stream", "media")
365}
366
367pub fn stream_direction_predicate() -> Symbol {
369 Symbol::qualified("stream", "direction")
370}
371
372pub fn stream_buffer_predicate() -> Symbol {
374 Symbol::qualified("stream", "buffer")
375}
376
377pub fn publish_metadata_claims(cx: &mut Cx, subject: Ref, metadata: &StreamMetadata) -> Result<()> {
386 publish_stream_metadata_claims(
387 cx,
388 subject.clone(),
389 [
390 (stream_id_predicate(), Ref::Symbol(metadata.id.clone())),
391 (
392 stream_media_predicate(),
393 Ref::Symbol(metadata.media.symbol()),
394 ),
395 (
396 stream_direction_predicate(),
397 Ref::Symbol(metadata.direction.symbol()),
398 ),
399 (
400 stream_surface::stream_clock_predicate(),
401 Ref::Symbol(metadata.clock.clone()),
402 ),
403 (
404 stream_buffer_predicate(),
405 Ref::Symbol(metadata.buffer.symbol()),
406 ),
407 ],
408 )?;
409 insert_once(
410 cx,
411 subject,
412 stream_surface::stream_transport_predicate(),
413 Ref::Symbol(Symbol::qualified("stream", "memory")),
414 )
415}
416
417fn insert_once(cx: &mut Cx, subject: Ref, predicate: Symbol, object: Ref) -> Result<()> {
418 let exists = !cx
419 .query_facts(ClaimPattern::exact(
420 subject.clone(),
421 predicate.clone(),
422 object.clone(),
423 ))?
424 .is_empty();
425 if !exists {
426 cx.insert_fact(Claim::public(subject, predicate, object))?;
427 }
428 Ok(())
429}
430
431fn symbol_expr(expr: &Expr, expected: &'static str) -> Result<Symbol> {
432 Ok(symbol_expr_ref(expr, expected)?.clone())
433}
434
435fn symbol_expr_ref<'a>(expr: &'a Expr, expected: &'static str) -> Result<&'a Symbol> {
436 match expr {
437 Expr::Symbol(symbol) => Ok(symbol),
438 other => Err(Error::TypeMismatch {
439 expected,
440 found: expr_kind(other),
441 }),
442 }
443}