1use crate::{
7 card::card_kind_predicate,
8 claim::{Claim, ClaimPattern},
9 env::Cx,
10 error::{Error, Result},
11 event::{Event, EventKind, Tick},
12 id::Symbol,
13 ref_id::Ref,
14};
15
16pub fn stream_kind() -> Symbol {
18 Symbol::qualified("core", "stream")
19}
20
21pub fn stream_clock_predicate() -> Symbol {
23 Symbol::qualified("stream", "clock")
24}
25
26pub fn stream_codec_predicate() -> Symbol {
28 Symbol::qualified("stream", "codec")
29}
30
31pub fn stream_transport_predicate() -> Symbol {
33 Symbol::qualified("stream", "transport")
34}
35
36pub fn publish_stream_metadata_claims(
38 cx: &mut Cx,
39 stream: Ref,
40 metadata: impl IntoIterator<Item = (Symbol, Ref)>,
41) -> Result<()> {
42 insert_once(
43 cx,
44 stream.clone(),
45 card_kind_predicate(),
46 Ref::Symbol(stream_kind()),
47 )?;
48 for (predicate, object) in metadata {
49 insert_once(cx, stream.clone(), predicate, object)?;
50 }
51 Ok(())
52}
53
54pub fn stream_packet_event(run: Ref, seq: u64, ticks: Vec<Tick>, payload: Ref) -> Result<Event> {
56 match payload {
57 Ref::Content(_) | Ref::Handle(_) => {
58 Event::new(run, seq, ticks, EventKind::Chunk { payload })
59 }
60 Ref::Symbol(_) | Ref::Coord(_) => Err(Error::Eval(
61 "stream packet payload must be a content or handle ref".to_owned(),
62 )),
63 }
64}
65
66pub fn remote_stream_frame_event(
68 run: Ref,
69 seq: u64,
70 ticks: Vec<Tick>,
71 frame: Ref,
72) -> Result<Event> {
73 stream_packet_event(run, seq, ticks, frame)
74}
75
76fn insert_once(cx: &mut Cx, subject: Ref, predicate: Symbol, object: Ref) -> Result<()> {
77 let exists = !cx
78 .query_facts(ClaimPattern::exact(
79 subject.clone(),
80 predicate.clone(),
81 object.clone(),
82 ))?
83 .is_empty();
84 if !exists {
85 cx.insert_fact(Claim::public(subject, predicate, object))?;
86 }
87 Ok(())
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::{
94 ContentId, Coordinate, DefaultFactory, Expr, HandleId, NoopEvalPolicy, card::card_for_ref,
95 };
96 use std::sync::Arc;
97
98 #[test]
99 fn stream_metadata_and_packet_events_use_claims_and_refs() {
100 let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
101 let stream = Ref::Handle(HandleId::fresh());
102 publish_stream_metadata_claims(
103 &mut cx,
104 stream.clone(),
105 [(
106 stream_clock_predicate(),
107 Ref::Symbol(Symbol::qualified("clock", "sample")),
108 )],
109 )
110 .unwrap();
111
112 let claims = cx
113 .query_facts(ClaimPattern::exact(
114 stream,
115 stream_clock_predicate(),
116 Ref::Symbol(Symbol::qualified("clock", "sample")),
117 ))
118 .unwrap();
119 assert_eq!(claims.len(), 1);
120
121 let payload = Ref::Content(ContentId::from_bytes(
122 Symbol::qualified("core", "sha256"),
123 [7; 32],
124 ));
125 let event = stream_packet_event(
126 Ref::Symbol(Symbol::qualified("run", "one")),
127 0,
128 Vec::new(),
129 payload.clone(),
130 )
131 .unwrap();
132 assert!(matches!(event.kind, EventKind::Chunk { payload: actual } if actual == payload));
133 }
134
135 #[test]
136 fn stream_metadata_and_payload_rules_project_to_cards() {
137 let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
138 let stream = Ref::Handle(HandleId::fresh());
139 publish_stream_metadata_claims(
140 &mut cx,
141 stream.clone(),
142 [
143 (
144 stream_clock_predicate(),
145 Ref::Symbol(Symbol::qualified("clock", "sample")),
146 ),
147 (
148 stream_codec_predicate(),
149 Ref::Symbol(Symbol::qualified("codec", "binary-base64")),
150 ),
151 (
152 stream_transport_predicate(),
153 Ref::Symbol(Symbol::qualified("stream", "memory")),
154 ),
155 ],
156 )
157 .unwrap();
158
159 let card = card_expr(&mut cx, stream.clone());
160 assert_eq!(
161 table_value(&card, "kind"),
162 Some(&Expr::Symbol(stream_kind()))
163 );
164
165 let run = Ref::Symbol(Symbol::qualified("run", "one"));
166 let ordinal = ContentId::from_bytes(Symbol::qualified("core", "sha256"), [3; 32]);
167 let coordinate = Ref::Coord(Coordinate {
168 space: Symbol::qualified("rank", "space"),
169 ordinal,
170 });
171 assert!(
172 stream_packet_event(run.clone(), 0, Vec::new(), Ref::Symbol(Symbol::new("bad")))
173 .is_err()
174 );
175 assert!(stream_packet_event(run, 0, Vec::new(), coordinate).is_err());
176 }
177
178 fn card_expr(cx: &mut Cx, subject: Ref) -> Expr {
179 card_for_ref(cx, subject)
180 .unwrap()
181 .object()
182 .as_expr(cx)
183 .unwrap()
184 }
185
186 fn table_value<'a>(expr: &'a Expr, key: &str) -> Option<&'a Expr> {
187 let Expr::Map(entries) = expr else {
188 return None;
189 };
190 entries.iter().find_map(|(entry_key, entry_value)| {
191 let Expr::Symbol(entry_key) = entry_key else {
192 return None;
193 };
194 (entry_key == &Symbol::new(key)).then_some(entry_value)
195 })
196 }
197}