Skip to main content

sim_lib_stream_fabric/
request.rs

1use sim_kernel::{
2    Datum, DatumStore, Error, Event, Expr, HandleId, ObserveMode, RealizeRequest, Ref, Result,
3    Symbol, Term,
4};
5use sim_lib_server::Site;
6use sim_lib_stream_core::{PlacedFragment, StreamEnvelope, StreamValue};
7
8/// Builds a realize request that observes a term as an event stream.
9///
10/// The request observes in `ObserveMode::Events` and carries an optional
11/// `buffer_limit`, framing the term for the location-transparent eval surface
12/// to drive as a stream.
13pub fn stream_realize_request(term: Term, buffer_limit: Option<usize>) -> RealizeRequest {
14    let mut request = RealizeRequest::new(term).observing(ObserveMode::Events);
15    request.buffer_limit = buffer_limit;
16    request
17}
18
19/// Drains a stream into the events of one realized run.
20///
21/// Emits a `started` event, one chunk event per packet up to the request's
22/// answer limit, and a `done` event once the stream reports completion.
23/// Requires the request to observe in `ObserveMode::Events`.
24pub fn realize_stream_events(
25    cx: &mut sim_kernel::Cx,
26    request: &RealizeRequest,
27    stream: &StreamValue,
28) -> Result<Vec<Event>> {
29    if request.observe != ObserveMode::Events {
30        return Err(Error::Eval(
31            "stream realization requires ObserveMode::Events".to_owned(),
32        ));
33    }
34    let run = Ref::Handle(HandleId::fresh());
35    let request_ref = realize_request_ref(cx, request)?;
36    let mut events = vec![Event::started(run.clone(), 0, request_ref)?];
37    let mut seq = 1u64;
38    let mut emitted = 0usize;
39    let limit = request.answer_limit.unwrap_or(usize::MAX);
40    while emitted < limit {
41        let Some(item) = stream.next_packet()? else {
42            break;
43        };
44        events.push(item.chunk_event(cx, run.clone(), seq)?);
45        seq = seq.saturating_add(1);
46        emitted += 1;
47    }
48    if stream.is_done()? {
49        events.push(Event::done(run, seq)?);
50    }
51    Ok(events)
52}
53
54/// Realizes a placed fragment on `site` and returns its output envelopes.
55///
56/// Delegates to `sim_lib_server::realize_stream_events`, keeping placed-stream
57/// realization on the realize surface rather than a transport-specific API.
58pub fn realize_placed_stream_events(
59    cx: &mut sim_kernel::Cx,
60    fragment: PlacedFragment,
61    site: &dyn Site,
62) -> Result<Vec<StreamEnvelope>> {
63    sim_lib_server::realize_stream_events(cx, fragment, site)
64}
65
66fn realize_request_ref(cx: &mut sim_kernel::Cx, request: &RealizeRequest) -> Result<Ref> {
67    let id = cx.datum_store_mut().intern(Datum::Node {
68        tag: Symbol::qualified("stream/fabric", "RealizeRequest"),
69        fields: vec![
70            (
71                Symbol::new("term"),
72                Datum::try_from(Expr::from(request.term.clone()))?,
73            ),
74            (
75                Symbol::new("observe"),
76                Datum::String(observe_name(request.observe).to_owned()),
77            ),
78            (
79                Symbol::new("buffer-limit"),
80                optional_usize_datum(request.buffer_limit),
81            ),
82            (
83                Symbol::new("answer-limit"),
84                optional_usize_datum(request.answer_limit),
85            ),
86        ],
87    })?;
88    Ok(Ref::Content(id))
89}
90
91fn optional_usize_datum(value: Option<usize>) -> Datum {
92    value.map_or(Datum::Nil, |value| Datum::String(value.to_string()))
93}
94
95fn observe_name(observe: ObserveMode) -> &'static str {
96    match observe {
97        ObserveMode::FinalOnly => "final-only",
98        ObserveMode::Events => "events",
99        ObserveMode::Ledger => "ledger",
100    }
101}