sim_lib_stream_fabric/
request.rs1use sim_kernel::{
2 Datum, DatumStore, Error, Event, Expr, HandleId, ObserveMode, RealizeRequest, Ref, Result,
3 Symbol, Term,
4};
5use sim_lib_server::Site;
6use sim_lib_stream_core::{PlacedFragment, StreamEnvelope, StreamValue};
7
8pub fn stream_realize_request(term: Term, buffer_limit: Option<usize>) -> RealizeRequest {
14 let mut request = RealizeRequest::new(term).observing(ObserveMode::Events);
15 request.buffer_limit = buffer_limit;
16 request
17}
18
19pub fn realize_stream_events(
25 cx: &mut sim_kernel::Cx,
26 request: &RealizeRequest,
27 stream: &StreamValue,
28) -> Result<Vec<Event>> {
29 if request.observe != ObserveMode::Events {
30 return Err(Error::Eval(
31 "stream realization requires ObserveMode::Events".to_owned(),
32 ));
33 }
34 let run = Ref::Handle(HandleId::fresh());
35 let request_ref = realize_request_ref(cx, request)?;
36 let mut events = vec![Event::started(run.clone(), 0, request_ref)?];
37 let mut seq = 1u64;
38 let mut emitted = 0usize;
39 let limit = request.answer_limit.unwrap_or(usize::MAX);
40 while emitted < limit {
41 let Some(item) = stream.next_packet()? else {
42 break;
43 };
44 events.push(item.chunk_event(cx, run.clone(), seq)?);
45 seq = seq.saturating_add(1);
46 emitted += 1;
47 }
48 if stream.is_done()? {
49 events.push(Event::done(run, seq)?);
50 }
51 Ok(events)
52}
53
54pub fn realize_placed_stream_events(
59 cx: &mut sim_kernel::Cx,
60 fragment: PlacedFragment,
61 site: &dyn Site,
62) -> Result<Vec<StreamEnvelope>> {
63 sim_lib_server::realize_stream_events(cx, fragment, site)
64}
65
66fn realize_request_ref(cx: &mut sim_kernel::Cx, request: &RealizeRequest) -> Result<Ref> {
67 let id = cx.datum_store_mut().intern(Datum::Node {
68 tag: Symbol::qualified("stream/fabric", "RealizeRequest"),
69 fields: vec![
70 (
71 Symbol::new("term"),
72 Datum::try_from(Expr::from(request.term.clone()))?,
73 ),
74 (
75 Symbol::new("observe"),
76 Datum::String(observe_name(request.observe).to_owned()),
77 ),
78 (
79 Symbol::new("buffer-limit"),
80 optional_usize_datum(request.buffer_limit),
81 ),
82 (
83 Symbol::new("answer-limit"),
84 optional_usize_datum(request.answer_limit),
85 ),
86 ],
87 })?;
88 Ok(Ref::Content(id))
89}
90
91fn optional_usize_datum(value: Option<usize>) -> Datum {
92 value.map_or(Datum::Nil, |value| Datum::String(value.to_string()))
93}
94
95fn observe_name(observe: ObserveMode) -> &'static str {
96 match observe {
97 ObserveMode::FinalOnly => "final-only",
98 ObserveMode::Events => "events",
99 ObserveMode::Ledger => "ledger",
100 }
101}