Skip to main content

sim_kernel/
realize.rs

1//! The `realize` surface: the location-transparent distributed eval contract.
2//!
3//! The kernel defines the realize request, observe modes, and event-draining
4//! contract that server and agent code target instead of transport-specific
5//! APIs; libraries supply the concrete transports behind it.
6
7use std::{collections::VecDeque, sync::Mutex, time::Duration};
8
9use crate::{
10    capability::CapabilityName,
11    env::Cx,
12    error::{Error, Result},
13    eval::{Consistency, EvalFabric, EvalMode, EvalReply, EvalRequest},
14    event::{Event, EventKind, EventSource},
15    event_ledger::EventLedger,
16    expr::Expr,
17    handle_store::HandleStore,
18    id::{CORE_SEQUENCE_CLASS_ID, Symbol},
19    object::{ClassRef, Object, ShapeRef},
20    ref_id::{HandleId, Ref},
21    ref_resolver::{RefResolver, ResolvedRef, TemporaryRefResolver, value_from_ref},
22    seq::{Sequence, SequenceItem, sequence_item_from_event},
23    term::Term,
24    value::Value,
25};
26
27/// How much of an evaluation a caller wants to observe.
28#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
29pub enum ObserveMode {
30    /// Observe only the final value (the default).
31    #[default]
32    FinalOnly,
33    /// Stream intermediate events as they occur.
34    Events,
35    /// Collect the full event ledger alongside the final value.
36    Ledger,
37}
38
39/// A reference-based realize request: the portable form of an [`EvalRequest`].
40///
41/// Where [`EvalRequest`] carries an in-process [`Expr`] and live values, a
42/// [`RealizeRequest`] carries a [`Term`] and [`Ref`]s, so it can cross a
43/// transport boundary. The two convert via
44/// [`from_eval_request`](RealizeRequest::from_eval_request) and
45/// [`to_eval_request`](RealizeRequest::to_eval_request). See the README
46/// section "Distributed evaluation".
47///
48/// # Examples
49///
50/// ```
51/// use sim_kernel::realize::{ObserveMode, RealizeRequest};
52/// use sim_kernel::term::Term;
53/// use sim_kernel::Symbol;
54///
55/// let request = RealizeRequest::new(Term::Local(Symbol::new("x")))
56///     .observing(ObserveMode::Events);
57/// assert_eq!(request.observe, ObserveMode::Events);
58/// assert!(request.required_capabilities.is_empty());
59/// ```
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct RealizeRequest {
62    /// The term to evaluate.
63    pub term: Term,
64    /// Optional reference to a shape the result must satisfy.
65    pub result_shape: Option<Ref>,
66    /// Capabilities the evaluation requires.
67    pub required_capabilities: Vec<CapabilityName>,
68    /// Optional wall-clock deadline for the evaluation.
69    pub deadline: Option<Duration>,
70    /// Where the request may be answered from.
71    pub consistency: Consistency,
72    /// Which evaluation discipline to run under.
73    pub mode: EvalMode,
74    /// Optional cap on the number of answers (logic mode).
75    pub answer_limit: Option<usize>,
76    /// Optional buffer size for streamed events.
77    pub buffer_limit: Option<usize>,
78    /// How much of the evaluation to observe.
79    pub observe: ObserveMode,
80}
81
82impl RealizeRequest {
83    /// Creates a request to realize `term` with all defaults.
84    pub fn new(term: Term) -> Self {
85        Self {
86            term,
87            result_shape: None,
88            required_capabilities: Vec::new(),
89            deadline: None,
90            consistency: Consistency::default(),
91            mode: EvalMode::default(),
92            answer_limit: None,
93            buffer_limit: None,
94            observe: ObserveMode::default(),
95        }
96    }
97
98    /// Sets the [`ObserveMode`], returning the request (builder style).
99    pub fn observing(mut self, observe: ObserveMode) -> Self {
100        self.observe = observe;
101        self
102    }
103
104    /// Adds a required capability, returning the request (builder style).
105    pub fn requiring(mut self, capability: CapabilityName) -> Self {
106        self.required_capabilities.push(capability);
107        self
108    }
109
110    /// Builds a [`RealizeRequest`] from an in-process [`EvalRequest`].
111    ///
112    /// Live values become [`Ref`]s and the [`Expr`] becomes a [`Term`] so the
113    /// request can cross a transport boundary.
114    pub fn from_eval_request(cx: &mut Cx, request: &EvalRequest) -> Result<Self> {
115        Ok(Self {
116            term: term_from_eval_expr(cx, &request.expr)?,
117            result_shape: request
118                .result_shape
119                .as_ref()
120                .map(|shape| handle_ref_for_value(cx, shape))
121                .transpose()?,
122            required_capabilities: request.required_capabilities.clone(),
123            deadline: request.deadline,
124            consistency: request.consistency,
125            mode: request.mode,
126            answer_limit: request.answer_limit,
127            buffer_limit: request.stream_buffer,
128            observe: observe_from_eval_request(request),
129        })
130    }
131
132    /// Resolves this request back into an in-process [`EvalRequest`].
133    ///
134    /// The inverse of [`from_eval_request`](RealizeRequest::from_eval_request):
135    /// [`Term`] and [`Ref`]s are resolved against `cx` into an [`Expr`] and
136    /// live values.
137    pub fn to_eval_request(&self, cx: &mut Cx) -> Result<EvalRequest> {
138        Ok(EvalRequest {
139            expr: eval_expr_from_term(cx, &self.term)?,
140            result_shape: self
141                .result_shape
142                .as_ref()
143                .map(|reference| shape_from_ref(cx, reference))
144                .transpose()?,
145            required_capabilities: self.required_capabilities.clone(),
146            deadline: self.deadline,
147            consistency: self.consistency,
148            mode: self.mode,
149            answer_limit: self.answer_limit,
150            stream_buffer: self.buffer_limit,
151            stream: self.observe == ObserveMode::Events,
152            trace: self.observe != ObserveMode::FinalOnly,
153        })
154    }
155}
156
157// sim-non-citizen(reason = "buffered event-source handle; descriptor data is the event sequence payload", kind = "handle", descriptor = "core/EventSource")
158/// An in-memory [`EventSource`] backed by a buffered queue of events.
159///
160/// Returned by [`realize_events`] to replay a completed evaluation's events
161/// (diagnostics, trace, final value, done) without a live transport.
162#[derive(Debug)]
163pub struct BufferedEventSource {
164    events: Mutex<VecDeque<Event>>,
165}
166
167impl BufferedEventSource {
168    /// Wraps an ordered list of events as a drainable source.
169    pub fn new(events: Vec<Event>) -> Self {
170        Self {
171            events: Mutex::new(events.into()),
172        }
173    }
174}
175
176impl EventSource for BufferedEventSource {
177    fn next(&self, _cx: &mut Cx) -> Result<Option<Event>> {
178        Ok(self
179            .events
180            .lock()
181            .map_err(|_| Error::PoisonedLock("event source"))?
182            .pop_front())
183    }
184
185    fn close(&self, _cx: &mut Cx) -> Result<()> {
186        self.events
187            .lock()
188            .map_err(|_| Error::PoisonedLock("event source"))?
189            .clear();
190        Ok(())
191    }
192}
193
194impl Object for BufferedEventSource {
195    fn display(&self, _cx: &mut Cx) -> Result<String> {
196        Ok("#<event-source>".to_owned())
197    }
198
199    fn as_any(&self) -> &dyn std::any::Any {
200        self
201    }
202}
203
204impl crate::ObjectCompat for BufferedEventSource {
205    fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
206        cx.factory().class_stub(
207            CORE_SEQUENCE_CLASS_ID,
208            Symbol::qualified("core", "EventSource"),
209        )
210    }
211    fn as_sequence(&self) -> Option<&dyn Sequence> {
212        Some(self)
213    }
214}
215
216impl Sequence for BufferedEventSource {
217    fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
218        while let Some(event) = EventSource::next(self, cx)? {
219            let done = matches!(event.kind, EventKind::Done);
220            if let Some(item) = sequence_item_from_event(cx, event)? {
221                return Ok(Some(item));
222            }
223            if done {
224                return Ok(None);
225            }
226        }
227        Ok(None)
228    }
229
230    fn close(&self, cx: &mut Cx) -> Result<()> {
231        EventSource::close(self, cx)
232    }
233
234    fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
235        let events = self
236            .events
237            .lock()
238            .map_err(|_| Error::PoisonedLock("event source"))?
239            .iter()
240            .cloned()
241            .collect::<Vec<_>>();
242        for event in events {
243            let done = matches!(event.kind, EventKind::Done);
244            if let Some(item) = sequence_item_from_event(cx, event)? {
245                return Ok(Some(item));
246            }
247            if done {
248                return Ok(None);
249            }
250        }
251        Ok(None)
252    }
253
254    fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
255        let events = self
256            .events
257            .lock()
258            .map_err(|_| Error::PoisonedLock("event source"))?;
259        for event in events.iter() {
260            match event.kind {
261                EventKind::Chunk { .. } | EventKind::Final(_) | EventKind::Failed(_) => {
262                    return Ok(false);
263                }
264                EventKind::Done => return Ok(true),
265                _ => {}
266            }
267        }
268        Ok(true)
269    }
270}
271
272/// Realizes `request` against `target` and returns its events as a source.
273///
274/// Runs the evaluation through the [`EvalFabric`], records the resulting
275/// diagnostics, optional trace, final value, and a terminating `done` into an
276/// event ledger, and hands them back as a [`BufferedEventSource`].
277pub fn realize_events(
278    cx: &mut Cx,
279    target: &dyn EvalFabric,
280    request: EvalRequest,
281) -> Result<BufferedEventSource> {
282    let eventful_request = RealizeRequest::from_eval_request(cx, &request)?;
283    let request_ref = ref_for_realize_request(cx, &eventful_request)?;
284    let run = Ref::Handle(HandleId::fresh());
285    let mut ledger = EventLedger::new();
286    ledger.started(run.clone(), request_ref)?;
287
288    let reply = target.realize(cx, request)?;
289    for diagnostic in &reply.diagnostics {
290        ledger.push(run.clone(), EventKind::Diagnostic(diagnostic.clone()))?;
291    }
292    if let Some(trace) = &reply.trace {
293        ledger.push(
294            run.clone(),
295            EventKind::Trace(handle_ref_for_value(cx, trace)?),
296        )?;
297    }
298    ledger.final_value(run.clone(), handle_ref_for_value(cx, &reply.value)?)?;
299    ledger.done(run.clone())?;
300
301    Ok(BufferedEventSource::new(
302        ledger.events_for_run(&run).to_vec(),
303    ))
304}
305
306/// Realizes `request` against `target` and collects only the final reply.
307///
308/// A convenience over [`realize_events`] plus [`drain_events_to_reply`] for
309/// callers that want the [`EvalReply`] rather than the event stream.
310pub fn realize_final(
311    cx: &mut Cx,
312    target: &dyn EvalFabric,
313    request: EvalRequest,
314) -> Result<EvalReply> {
315    let events = realize_events(cx, target, request)?;
316    drain_events_to_reply(cx, &events)
317}
318
319/// Drains an [`EventSource`] into a single [`EvalReply`].
320///
321/// Accumulates diagnostics and an optional trace, captures the final value,
322/// and stops at `done`. A `Failed` event is turned into an error, and a
323/// stream that ends without a final value is an error.
324pub fn drain_events_to_reply(cx: &mut Cx, source: &dyn EventSource) -> Result<EvalReply> {
325    let mut diagnostics = Vec::new();
326    let mut trace = None;
327    let mut value = None;
328
329    while let Some(event) = source.next(cx)? {
330        match event.kind {
331            EventKind::Diagnostic(diagnostic) => diagnostics.push(diagnostic),
332            EventKind::Trace(reference) => trace = Some(value_from_ref(cx, &reference)?),
333            EventKind::Final(reference) => value = Some(value_from_ref(cx, &reference)?),
334            EventKind::Failed(reference) => return Err(error_from_failed_ref(cx, &reference)),
335            EventKind::Done => break,
336            EventKind::Started { .. }
337            | EventKind::Claim { .. }
338            | EventKind::Chunk { .. }
339            | EventKind::EffectRequested { .. }
340            | EventKind::EffectResolved { .. }
341            | EventKind::Capture { .. }
342            | EventKind::Card { .. } => {}
343        }
344    }
345
346    Ok(EvalReply {
347        value: value
348            .ok_or_else(|| Error::Eval("eventful realize ended without Final".to_owned()))?,
349        diagnostics,
350        trace,
351    })
352}
353
354fn observe_from_eval_request(request: &EvalRequest) -> ObserveMode {
355    if request.stream {
356        ObserveMode::Events
357    } else if request.trace {
358        ObserveMode::Ledger
359    } else {
360        ObserveMode::FinalOnly
361    }
362}
363
364fn term_from_eval_expr(cx: &mut Cx, expr: &Expr) -> Result<Term> {
365    match Term::try_from(expr.clone()) {
366        Ok(term) => Ok(term),
367        Err(_) => {
368            let value = cx.factory().expr(expr.clone())?;
369            Ok(Term::Ref(handle_ref_for_value(cx, &value)?))
370        }
371    }
372}
373
374fn eval_expr_from_term(cx: &mut Cx, term: &Term) -> Result<Expr> {
375    let Term::Ref(reference) = term else {
376        return Ok(Expr::from(term.clone()));
377    };
378    match TemporaryRefResolver::new().resolve_ref(cx, reference)? {
379        ResolvedRef::Symbol(symbol) => Ok(Expr::Symbol(symbol)),
380        ResolvedRef::Datum(datum) => Ok(Expr::from(datum)),
381        ResolvedRef::Value(value) => value.object().as_expr(cx),
382        ResolvedRef::Coordinate(_) | ResolvedRef::Missing(_) => Ok(Expr::from(term.clone())),
383    }
384}
385
386fn shape_from_ref(cx: &mut Cx, reference: &Ref) -> Result<ShapeRef> {
387    match TemporaryRefResolver::new().resolve_ref(cx, reference)? {
388        ResolvedRef::Symbol(symbol) => cx.resolve_shape(&symbol),
389        ResolvedRef::Value(value) => {
390            if value.object().as_shape().is_some() {
391                Ok(value)
392            } else if let Some(class) = value.object().as_class() {
393                class.instance_shape(cx)
394            } else {
395                Err(Error::TypeMismatch {
396                    expected: "shape ref",
397                    found: "non-shape",
398                })
399            }
400        }
401        ResolvedRef::Datum(_) => Err(Error::TypeMismatch {
402            expected: "shape ref",
403            found: "non-shape",
404        }),
405        ResolvedRef::Coordinate(_) | ResolvedRef::Missing(_) => Err(Error::Eval(format!(
406            "unresolved result shape ref {reference:?}"
407        ))),
408    }
409}
410
411fn ref_for_realize_request(cx: &mut Cx, request: &RealizeRequest) -> Result<Ref> {
412    let value = realize_request_value(cx, request)?;
413    handle_ref_for_value(cx, &value)
414}
415
416fn realize_request_value(cx: &mut Cx, request: &RealizeRequest) -> Result<Value> {
417    let capabilities = request
418        .required_capabilities
419        .iter()
420        .map(|capability| cx.factory().string(capability.as_str().to_owned()))
421        .collect::<Result<Vec<_>>>()?;
422    let term = cx.factory().expr(Expr::from(request.term.clone()))?;
423    let result_shape = optional_ref_value(cx, request.result_shape.as_ref())?;
424    let requires = cx.factory().list(capabilities)?;
425    let deadline = match request.deadline {
426        Some(deadline) => cx.factory().string(format!("{}ms", deadline.as_millis()))?,
427        None => cx.factory().nil()?,
428    };
429    let consistency = cx.factory().symbol(request.consistency.as_symbol())?;
430    let mode = cx.factory().symbol(request.mode.as_symbol())?;
431    let answer_limit = optional_usize_value(cx, request.answer_limit)?;
432    let buffer_limit = optional_usize_value(cx, request.buffer_limit)?;
433    let observe = cx.factory().symbol(observe_symbol(request.observe))?;
434
435    cx.factory().table(vec![
436        (Symbol::new("term"), term),
437        (Symbol::new("result-shape"), result_shape),
438        (Symbol::new("requires"), requires),
439        (Symbol::new("deadline"), deadline),
440        (Symbol::new("consistency"), consistency),
441        (Symbol::new("mode"), mode),
442        (Symbol::new("answer-limit"), answer_limit),
443        (Symbol::new("buffer-limit"), buffer_limit),
444        (Symbol::new("observe"), observe),
445    ])
446}
447
448fn optional_ref_value(cx: &mut Cx, reference: Option<&Ref>) -> Result<Value> {
449    match reference {
450        Some(reference) => cx.factory().expr(Expr::from(Term::Ref(reference.clone()))),
451        None => cx.factory().nil(),
452    }
453}
454
455fn optional_usize_value(cx: &mut Cx, value: Option<usize>) -> Result<Value> {
456    match value {
457        Some(value) => cx.factory().string(value.to_string()),
458        None => cx.factory().nil(),
459    }
460}
461
462fn observe_symbol(observe: ObserveMode) -> Symbol {
463    Symbol::new(match observe {
464        ObserveMode::FinalOnly => "final-only",
465        ObserveMode::Events => "events",
466        ObserveMode::Ledger => "ledger",
467    })
468}
469
470fn handle_ref_for_value(cx: &mut Cx, value: &Value) -> Result<Ref> {
471    Ok(Ref::Handle(cx.handles_mut().intern(value.clone())))
472}
473
474fn error_from_failed_ref(cx: &mut Cx, reference: &Ref) -> Error {
475    match value_from_ref(cx, reference) {
476        Ok(value) => match value.object().display(cx) {
477            Ok(message) => Error::Eval(message),
478            Err(err) => err,
479        },
480        Err(err) => err,
481    }
482}