1use std::{collections::VecDeque, sync::Mutex, time::Duration};
8
9use crate::{
10 capability::CapabilityName,
11 env::Cx,
12 error::{Error, Result},
13 eval::{Consistency, EvalFabric, EvalMode, EvalReply, EvalRequest},
14 event::{Event, EventKind, EventSource},
15 event_ledger::EventLedger,
16 expr::Expr,
17 handle_store::HandleStore,
18 id::{CORE_SEQUENCE_CLASS_ID, Symbol},
19 object::{ClassRef, Object, ShapeRef},
20 ref_id::{HandleId, Ref},
21 ref_resolver::{RefResolver, ResolvedRef, TemporaryRefResolver, value_from_ref},
22 seq::{Sequence, SequenceItem, sequence_item_from_event},
23 term::Term,
24 value::Value,
25};
26
27#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
29pub enum ObserveMode {
30 #[default]
32 FinalOnly,
33 Events,
35 Ledger,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct RealizeRequest {
62 pub term: Term,
64 pub result_shape: Option<Ref>,
66 pub required_capabilities: Vec<CapabilityName>,
68 pub deadline: Option<Duration>,
70 pub consistency: Consistency,
72 pub mode: EvalMode,
74 pub answer_limit: Option<usize>,
76 pub buffer_limit: Option<usize>,
78 pub observe: ObserveMode,
80}
81
82impl RealizeRequest {
83 pub fn new(term: Term) -> Self {
85 Self {
86 term,
87 result_shape: None,
88 required_capabilities: Vec::new(),
89 deadline: None,
90 consistency: Consistency::default(),
91 mode: EvalMode::default(),
92 answer_limit: None,
93 buffer_limit: None,
94 observe: ObserveMode::default(),
95 }
96 }
97
98 pub fn observing(mut self, observe: ObserveMode) -> Self {
100 self.observe = observe;
101 self
102 }
103
104 pub fn requiring(mut self, capability: CapabilityName) -> Self {
106 self.required_capabilities.push(capability);
107 self
108 }
109
110 pub fn from_eval_request(cx: &mut Cx, request: &EvalRequest) -> Result<Self> {
115 Ok(Self {
116 term: term_from_eval_expr(cx, &request.expr)?,
117 result_shape: request
118 .result_shape
119 .as_ref()
120 .map(|shape| handle_ref_for_value(cx, shape))
121 .transpose()?,
122 required_capabilities: request.required_capabilities.clone(),
123 deadline: request.deadline,
124 consistency: request.consistency,
125 mode: request.mode,
126 answer_limit: request.answer_limit,
127 buffer_limit: request.stream_buffer,
128 observe: observe_from_eval_request(request),
129 })
130 }
131
132 pub fn to_eval_request(&self, cx: &mut Cx) -> Result<EvalRequest> {
138 Ok(EvalRequest {
139 expr: eval_expr_from_term(cx, &self.term)?,
140 result_shape: self
141 .result_shape
142 .as_ref()
143 .map(|reference| shape_from_ref(cx, reference))
144 .transpose()?,
145 required_capabilities: self.required_capabilities.clone(),
146 deadline: self.deadline,
147 consistency: self.consistency,
148 mode: self.mode,
149 answer_limit: self.answer_limit,
150 stream_buffer: self.buffer_limit,
151 stream: self.observe == ObserveMode::Events,
152 trace: self.observe != ObserveMode::FinalOnly,
153 })
154 }
155}
156
157#[derive(Debug)]
163pub struct BufferedEventSource {
164 events: Mutex<VecDeque<Event>>,
165}
166
167impl BufferedEventSource {
168 pub fn new(events: Vec<Event>) -> Self {
170 Self {
171 events: Mutex::new(events.into()),
172 }
173 }
174}
175
176impl EventSource for BufferedEventSource {
177 fn next(&self, _cx: &mut Cx) -> Result<Option<Event>> {
178 Ok(self
179 .events
180 .lock()
181 .map_err(|_| Error::PoisonedLock("event source"))?
182 .pop_front())
183 }
184
185 fn close(&self, _cx: &mut Cx) -> Result<()> {
186 self.events
187 .lock()
188 .map_err(|_| Error::PoisonedLock("event source"))?
189 .clear();
190 Ok(())
191 }
192}
193
194impl Object for BufferedEventSource {
195 fn display(&self, _cx: &mut Cx) -> Result<String> {
196 Ok("#<event-source>".to_owned())
197 }
198
199 fn as_any(&self) -> &dyn std::any::Any {
200 self
201 }
202}
203
204impl crate::ObjectCompat for BufferedEventSource {
205 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
206 cx.factory().class_stub(
207 CORE_SEQUENCE_CLASS_ID,
208 Symbol::qualified("core", "EventSource"),
209 )
210 }
211 fn as_sequence(&self) -> Option<&dyn Sequence> {
212 Some(self)
213 }
214}
215
216impl Sequence for BufferedEventSource {
217 fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
218 while let Some(event) = EventSource::next(self, cx)? {
219 let done = matches!(event.kind, EventKind::Done);
220 if let Some(item) = sequence_item_from_event(cx, event)? {
221 return Ok(Some(item));
222 }
223 if done {
224 return Ok(None);
225 }
226 }
227 Ok(None)
228 }
229
230 fn close(&self, cx: &mut Cx) -> Result<()> {
231 EventSource::close(self, cx)
232 }
233
234 fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
235 let events = self
236 .events
237 .lock()
238 .map_err(|_| Error::PoisonedLock("event source"))?
239 .iter()
240 .cloned()
241 .collect::<Vec<_>>();
242 for event in events {
243 let done = matches!(event.kind, EventKind::Done);
244 if let Some(item) = sequence_item_from_event(cx, event)? {
245 return Ok(Some(item));
246 }
247 if done {
248 return Ok(None);
249 }
250 }
251 Ok(None)
252 }
253
254 fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
255 let events = self
256 .events
257 .lock()
258 .map_err(|_| Error::PoisonedLock("event source"))?;
259 for event in events.iter() {
260 match event.kind {
261 EventKind::Chunk { .. } | EventKind::Final(_) | EventKind::Failed(_) => {
262 return Ok(false);
263 }
264 EventKind::Done => return Ok(true),
265 _ => {}
266 }
267 }
268 Ok(true)
269 }
270}
271
272pub fn realize_events(
278 cx: &mut Cx,
279 target: &dyn EvalFabric,
280 request: EvalRequest,
281) -> Result<BufferedEventSource> {
282 let eventful_request = RealizeRequest::from_eval_request(cx, &request)?;
283 let request_ref = ref_for_realize_request(cx, &eventful_request)?;
284 let run = Ref::Handle(HandleId::fresh());
285 let mut ledger = EventLedger::new();
286 ledger.started(run.clone(), request_ref)?;
287
288 let reply = target.realize(cx, request)?;
289 for diagnostic in &reply.diagnostics {
290 ledger.push(run.clone(), EventKind::Diagnostic(diagnostic.clone()))?;
291 }
292 if let Some(trace) = &reply.trace {
293 ledger.push(
294 run.clone(),
295 EventKind::Trace(handle_ref_for_value(cx, trace)?),
296 )?;
297 }
298 ledger.final_value(run.clone(), handle_ref_for_value(cx, &reply.value)?)?;
299 ledger.done(run.clone())?;
300
301 Ok(BufferedEventSource::new(
302 ledger.events_for_run(&run).to_vec(),
303 ))
304}
305
306pub fn realize_final(
311 cx: &mut Cx,
312 target: &dyn EvalFabric,
313 request: EvalRequest,
314) -> Result<EvalReply> {
315 let events = realize_events(cx, target, request)?;
316 drain_events_to_reply(cx, &events)
317}
318
319pub fn drain_events_to_reply(cx: &mut Cx, source: &dyn EventSource) -> Result<EvalReply> {
325 let mut diagnostics = Vec::new();
326 let mut trace = None;
327 let mut value = None;
328
329 while let Some(event) = source.next(cx)? {
330 match event.kind {
331 EventKind::Diagnostic(diagnostic) => diagnostics.push(diagnostic),
332 EventKind::Trace(reference) => trace = Some(value_from_ref(cx, &reference)?),
333 EventKind::Final(reference) => value = Some(value_from_ref(cx, &reference)?),
334 EventKind::Failed(reference) => return Err(error_from_failed_ref(cx, &reference)),
335 EventKind::Done => break,
336 EventKind::Started { .. }
337 | EventKind::Claim { .. }
338 | EventKind::Chunk { .. }
339 | EventKind::EffectRequested { .. }
340 | EventKind::EffectResolved { .. }
341 | EventKind::Capture { .. }
342 | EventKind::Card { .. } => {}
343 }
344 }
345
346 Ok(EvalReply {
347 value: value
348 .ok_or_else(|| Error::Eval("eventful realize ended without Final".to_owned()))?,
349 diagnostics,
350 trace,
351 })
352}
353
354fn observe_from_eval_request(request: &EvalRequest) -> ObserveMode {
355 if request.stream {
356 ObserveMode::Events
357 } else if request.trace {
358 ObserveMode::Ledger
359 } else {
360 ObserveMode::FinalOnly
361 }
362}
363
364fn term_from_eval_expr(cx: &mut Cx, expr: &Expr) -> Result<Term> {
365 match Term::try_from(expr.clone()) {
366 Ok(term) => Ok(term),
367 Err(_) => {
368 let value = cx.factory().expr(expr.clone())?;
369 Ok(Term::Ref(handle_ref_for_value(cx, &value)?))
370 }
371 }
372}
373
374fn eval_expr_from_term(cx: &mut Cx, term: &Term) -> Result<Expr> {
375 let Term::Ref(reference) = term else {
376 return Ok(Expr::from(term.clone()));
377 };
378 match TemporaryRefResolver::new().resolve_ref(cx, reference)? {
379 ResolvedRef::Symbol(symbol) => Ok(Expr::Symbol(symbol)),
380 ResolvedRef::Datum(datum) => Ok(Expr::from(datum)),
381 ResolvedRef::Value(value) => value.object().as_expr(cx),
382 ResolvedRef::Coordinate(_) | ResolvedRef::Missing(_) => Ok(Expr::from(term.clone())),
383 }
384}
385
386fn shape_from_ref(cx: &mut Cx, reference: &Ref) -> Result<ShapeRef> {
387 match TemporaryRefResolver::new().resolve_ref(cx, reference)? {
388 ResolvedRef::Symbol(symbol) => cx.resolve_shape(&symbol),
389 ResolvedRef::Value(value) => {
390 if value.object().as_shape().is_some() {
391 Ok(value)
392 } else if let Some(class) = value.object().as_class() {
393 class.instance_shape(cx)
394 } else {
395 Err(Error::TypeMismatch {
396 expected: "shape ref",
397 found: "non-shape",
398 })
399 }
400 }
401 ResolvedRef::Datum(_) => Err(Error::TypeMismatch {
402 expected: "shape ref",
403 found: "non-shape",
404 }),
405 ResolvedRef::Coordinate(_) | ResolvedRef::Missing(_) => Err(Error::Eval(format!(
406 "unresolved result shape ref {reference:?}"
407 ))),
408 }
409}
410
411fn ref_for_realize_request(cx: &mut Cx, request: &RealizeRequest) -> Result<Ref> {
412 let value = realize_request_value(cx, request)?;
413 handle_ref_for_value(cx, &value)
414}
415
416fn realize_request_value(cx: &mut Cx, request: &RealizeRequest) -> Result<Value> {
417 let capabilities = request
418 .required_capabilities
419 .iter()
420 .map(|capability| cx.factory().string(capability.as_str().to_owned()))
421 .collect::<Result<Vec<_>>>()?;
422 let term = cx.factory().expr(Expr::from(request.term.clone()))?;
423 let result_shape = optional_ref_value(cx, request.result_shape.as_ref())?;
424 let requires = cx.factory().list(capabilities)?;
425 let deadline = match request.deadline {
426 Some(deadline) => cx.factory().string(format!("{}ms", deadline.as_millis()))?,
427 None => cx.factory().nil()?,
428 };
429 let consistency = cx.factory().symbol(request.consistency.as_symbol())?;
430 let mode = cx.factory().symbol(request.mode.as_symbol())?;
431 let answer_limit = optional_usize_value(cx, request.answer_limit)?;
432 let buffer_limit = optional_usize_value(cx, request.buffer_limit)?;
433 let observe = cx.factory().symbol(observe_symbol(request.observe))?;
434
435 cx.factory().table(vec![
436 (Symbol::new("term"), term),
437 (Symbol::new("result-shape"), result_shape),
438 (Symbol::new("requires"), requires),
439 (Symbol::new("deadline"), deadline),
440 (Symbol::new("consistency"), consistency),
441 (Symbol::new("mode"), mode),
442 (Symbol::new("answer-limit"), answer_limit),
443 (Symbol::new("buffer-limit"), buffer_limit),
444 (Symbol::new("observe"), observe),
445 ])
446}
447
448fn optional_ref_value(cx: &mut Cx, reference: Option<&Ref>) -> Result<Value> {
449 match reference {
450 Some(reference) => cx.factory().expr(Expr::from(Term::Ref(reference.clone()))),
451 None => cx.factory().nil(),
452 }
453}
454
455fn optional_usize_value(cx: &mut Cx, value: Option<usize>) -> Result<Value> {
456 match value {
457 Some(value) => cx.factory().string(value.to_string()),
458 None => cx.factory().nil(),
459 }
460}
461
462fn observe_symbol(observe: ObserveMode) -> Symbol {
463 Symbol::new(match observe {
464 ObserveMode::FinalOnly => "final-only",
465 ObserveMode::Events => "events",
466 ObserveMode::Ledger => "ledger",
467 })
468}
469
470fn handle_ref_for_value(cx: &mut Cx, value: &Value) -> Result<Ref> {
471 Ok(Ref::Handle(cx.handles_mut().intern(value.clone())))
472}
473
474fn error_from_failed_ref(cx: &mut Cx, reference: &Ref) -> Error {
475 match value_from_ref(cx, reference) {
476 Ok(value) => match value.object().display(cx) {
477 Ok(message) => Error::Eval(message),
478 Err(err) => err,
479 },
480 Err(err) => err,
481 }
482}