Skip to main content

sim_kernel/
stream_surface.rs

1//! Stream metadata and packet events: the contract for stream transport.
2//!
3//! The kernel defines the stream predicate vocabulary and the packet/frame
4//! event shapes; concrete stream transports and clocks live in library crates.
5
6use crate::{
7    card::card_kind_predicate,
8    claim::{Claim, ClaimPattern},
9    env::Cx,
10    error::{Error, Result},
11    event::{Event, EventKind, Tick},
12    id::Symbol,
13    ref_id::Ref,
14};
15
16/// Card-kind symbol marking a ref as a stream (`core/stream`).
17pub fn stream_kind() -> Symbol {
18    Symbol::qualified("core", "stream")
19}
20
21/// Predicate symbol naming a stream's clock (`stream/clock`).
22pub fn stream_clock_predicate() -> Symbol {
23    Symbol::qualified("stream", "clock")
24}
25
26/// Predicate symbol naming a stream's payload codec (`stream/codec`).
27pub fn stream_codec_predicate() -> Symbol {
28    Symbol::qualified("stream", "codec")
29}
30
31/// Predicate symbol naming a stream's transport (`stream/transport`).
32pub fn stream_transport_predicate() -> Symbol {
33    Symbol::qualified("stream", "transport")
34}
35
36/// Records the stream kind plus the given metadata predicates as facts, once.
37pub fn publish_stream_metadata_claims(
38    cx: &mut Cx,
39    stream: Ref,
40    metadata: impl IntoIterator<Item = (Symbol, Ref)>,
41) -> Result<()> {
42    insert_once(
43        cx,
44        stream.clone(),
45        card_kind_predicate(),
46        Ref::Symbol(stream_kind()),
47    )?;
48    for (predicate, object) in metadata {
49        insert_once(cx, stream.clone(), predicate, object)?;
50    }
51    Ok(())
52}
53
54/// Builds a stream packet event whose payload is a content or handle ref.
55pub fn stream_packet_event(run: Ref, seq: u64, ticks: Vec<Tick>, payload: Ref) -> Result<Event> {
56    match payload {
57        Ref::Content(_) | Ref::Handle(_) => {
58            Event::new(run, seq, ticks, EventKind::Chunk { payload })
59        }
60        Ref::Symbol(_) | Ref::Coord(_) => Err(Error::Eval(
61            "stream packet payload must be a content or handle ref".to_owned(),
62        )),
63    }
64}
65
66/// Builds a remote stream frame event; an alias for [`stream_packet_event`].
67pub fn remote_stream_frame_event(
68    run: Ref,
69    seq: u64,
70    ticks: Vec<Tick>,
71    frame: Ref,
72) -> Result<Event> {
73    stream_packet_event(run, seq, ticks, frame)
74}
75
76fn insert_once(cx: &mut Cx, subject: Ref, predicate: Symbol, object: Ref) -> Result<()> {
77    let exists = !cx
78        .query_facts(ClaimPattern::exact(
79            subject.clone(),
80            predicate.clone(),
81            object.clone(),
82        ))?
83        .is_empty();
84    if !exists {
85        cx.insert_fact(Claim::public(subject, predicate, object))?;
86    }
87    Ok(())
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::{
94        ContentId, Coordinate, DefaultFactory, Expr, HandleId, NoopEvalPolicy, card::card_for_ref,
95    };
96    use std::sync::Arc;
97
98    #[test]
99    fn stream_metadata_and_packet_events_use_claims_and_refs() {
100        let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
101        let stream = Ref::Handle(HandleId::fresh());
102        publish_stream_metadata_claims(
103            &mut cx,
104            stream.clone(),
105            [(
106                stream_clock_predicate(),
107                Ref::Symbol(Symbol::qualified("clock", "sample")),
108            )],
109        )
110        .unwrap();
111
112        let claims = cx
113            .query_facts(ClaimPattern::exact(
114                stream,
115                stream_clock_predicate(),
116                Ref::Symbol(Symbol::qualified("clock", "sample")),
117            ))
118            .unwrap();
119        assert_eq!(claims.len(), 1);
120
121        let payload = Ref::Content(ContentId::from_bytes(
122            Symbol::qualified("core", "sha256"),
123            [7; 32],
124        ));
125        let event = stream_packet_event(
126            Ref::Symbol(Symbol::qualified("run", "one")),
127            0,
128            Vec::new(),
129            payload.clone(),
130        )
131        .unwrap();
132        assert!(matches!(event.kind, EventKind::Chunk { payload: actual } if actual == payload));
133    }
134
135    #[test]
136    fn stream_metadata_and_payload_rules_project_to_cards() {
137        let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
138        let stream = Ref::Handle(HandleId::fresh());
139        publish_stream_metadata_claims(
140            &mut cx,
141            stream.clone(),
142            [
143                (
144                    stream_clock_predicate(),
145                    Ref::Symbol(Symbol::qualified("clock", "sample")),
146                ),
147                (
148                    stream_codec_predicate(),
149                    Ref::Symbol(Symbol::qualified("codec", "binary-base64")),
150                ),
151                (
152                    stream_transport_predicate(),
153                    Ref::Symbol(Symbol::qualified("stream", "memory")),
154                ),
155            ],
156        )
157        .unwrap();
158
159        let card = card_expr(&mut cx, stream.clone());
160        assert_eq!(
161            table_value(&card, "kind"),
162            Some(&Expr::Symbol(stream_kind()))
163        );
164
165        let run = Ref::Symbol(Symbol::qualified("run", "one"));
166        let ordinal = ContentId::from_bytes(Symbol::qualified("core", "sha256"), [3; 32]);
167        let coordinate = Ref::Coord(Coordinate {
168            space: Symbol::qualified("rank", "space"),
169            ordinal,
170        });
171        assert!(
172            stream_packet_event(run.clone(), 0, Vec::new(), Ref::Symbol(Symbol::new("bad")))
173                .is_err()
174        );
175        assert!(stream_packet_event(run, 0, Vec::new(), coordinate).is_err());
176    }
177
178    fn card_expr(cx: &mut Cx, subject: Ref) -> Expr {
179        card_for_ref(cx, subject)
180            .unwrap()
181            .object()
182            .as_expr(cx)
183            .unwrap()
184    }
185
186    fn table_value<'a>(expr: &'a Expr, key: &str) -> Option<&'a Expr> {
187        let Expr::Map(entries) = expr else {
188            return None;
189        };
190        entries.iter().find_map(|(entry_key, entry_value)| {
191            let Expr::Symbol(entry_key) = entry_key else {
192                return None;
193            };
194            (entry_key == &Symbol::new(key)).then_some(entry_value)
195        })
196    }
197}