use crate::{
card::card_kind_predicate,
claim::{Claim, ClaimPattern},
env::Cx,
error::{Error, Result},
event::{Event, EventKind, Tick},
id::Symbol,
ref_id::Ref,
};
pub fn stream_kind() -> Symbol {
Symbol::qualified("core", "stream")
}
pub fn stream_clock_predicate() -> Symbol {
Symbol::qualified("stream", "clock")
}
pub fn stream_codec_predicate() -> Symbol {
Symbol::qualified("stream", "codec")
}
pub fn stream_transport_predicate() -> Symbol {
Symbol::qualified("stream", "transport")
}
pub fn publish_stream_metadata_claims(
cx: &mut Cx,
stream: Ref,
metadata: impl IntoIterator<Item = (Symbol, Ref)>,
) -> Result<()> {
insert_once(
cx,
stream.clone(),
card_kind_predicate(),
Ref::Symbol(stream_kind()),
)?;
for (predicate, object) in metadata {
insert_once(cx, stream.clone(), predicate, object)?;
}
Ok(())
}
pub fn stream_packet_event(run: Ref, seq: u64, ticks: Vec<Tick>, payload: Ref) -> Result<Event> {
match payload {
Ref::Content(_) | Ref::Handle(_) => {
Event::new(run, seq, ticks, EventKind::Chunk { payload })
}
Ref::Symbol(_) | Ref::Coord(_) => Err(Error::Eval(
"stream packet payload must be a content or handle ref".to_owned(),
)),
}
}
pub fn remote_stream_frame_event(
run: Ref,
seq: u64,
ticks: Vec<Tick>,
frame: Ref,
) -> Result<Event> {
stream_packet_event(run, seq, ticks, frame)
}
fn insert_once(cx: &mut Cx, subject: Ref, predicate: Symbol, object: Ref) -> Result<()> {
let exists = !cx
.query_facts(ClaimPattern::exact(
subject.clone(),
predicate.clone(),
object.clone(),
))?
.is_empty();
if !exists {
cx.insert_fact(Claim::public(subject, predicate, object))?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
ContentId, Coordinate, DefaultFactory, Expr, HandleId, NoopEvalPolicy, card::card_for_ref,
};
use std::sync::Arc;
#[test]
fn stream_metadata_and_packet_events_use_claims_and_refs() {
let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
let stream = Ref::Handle(HandleId::fresh());
publish_stream_metadata_claims(
&mut cx,
stream.clone(),
[(
stream_clock_predicate(),
Ref::Symbol(Symbol::qualified("clock", "sample")),
)],
)
.unwrap();
let claims = cx
.query_facts(ClaimPattern::exact(
stream,
stream_clock_predicate(),
Ref::Symbol(Symbol::qualified("clock", "sample")),
))
.unwrap();
assert_eq!(claims.len(), 1);
let payload = Ref::Content(ContentId::from_bytes(
Symbol::qualified("core", "sha256"),
[7; 32],
));
let event = stream_packet_event(
Ref::Symbol(Symbol::qualified("run", "one")),
0,
Vec::new(),
payload.clone(),
)
.unwrap();
assert!(matches!(event.kind, EventKind::Chunk { payload: actual } if actual == payload));
}
#[test]
fn stream_metadata_and_payload_rules_project_to_cards() {
let mut cx = Cx::new(Arc::new(NoopEvalPolicy), Arc::new(DefaultFactory));
let stream = Ref::Handle(HandleId::fresh());
publish_stream_metadata_claims(
&mut cx,
stream.clone(),
[
(
stream_clock_predicate(),
Ref::Symbol(Symbol::qualified("clock", "sample")),
),
(
stream_codec_predicate(),
Ref::Symbol(Symbol::qualified("codec", "binary-base64")),
),
(
stream_transport_predicate(),
Ref::Symbol(Symbol::qualified("stream", "memory")),
),
],
)
.unwrap();
let card = card_expr(&mut cx, stream.clone());
assert_eq!(
table_value(&card, "kind"),
Some(&Expr::Symbol(stream_kind()))
);
let run = Ref::Symbol(Symbol::qualified("run", "one"));
let ordinal = ContentId::from_bytes(Symbol::qualified("core", "sha256"), [3; 32]);
let coordinate = Ref::Coord(Coordinate {
space: Symbol::qualified("rank", "space"),
ordinal,
});
assert!(
stream_packet_event(run.clone(), 0, Vec::new(), Ref::Symbol(Symbol::new("bad")))
.is_err()
);
assert!(stream_packet_event(run, 0, Vec::new(), coordinate).is_err());
}
fn card_expr(cx: &mut Cx, subject: Ref) -> Expr {
card_for_ref(cx, subject)
.unwrap()
.object()
.as_expr(cx)
.unwrap()
}
fn table_value<'a>(expr: &'a Expr, key: &str) -> Option<&'a Expr> {
let Expr::Map(entries) = expr else {
return None;
};
entries.iter().find_map(|(entry_key, entry_value)| {
let Expr::Symbol(entry_key) = entry_key else {
return None;
};
(entry_key == &Symbol::new(key)).then_some(entry_value)
})
}
}