1use crate::{
7 capability::CapabilityName,
8 datum::Datum,
9 datum_store::DatumStore,
10 env::Cx,
11 error::{Error, Result},
12 expr::NumberLiteral,
13 id::Symbol,
14 ref_id::{ContentId, Coordinate, HandleId, Ref},
15 term::OpKey,
16};
17
18pub const EFFECT_REPLAY_VERSION: &str = "sim6-effect-replay-v1";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
23pub struct Effect {
24 pub id: Ref,
26 pub kind: Symbol,
28 pub subject: Ref,
30 pub input: Ref,
32 pub result_shape: Ref,
34 pub resume_op: OpKey,
36 pub abort_op: OpKey,
38 pub requires: Vec<CapabilityName>,
40 pub replay_key: Option<ContentId>,
42}
43
44impl Effect {
45 pub fn new(
47 kind: Symbol,
48 subject: Ref,
49 input: Ref,
50 result_shape: Ref,
51 resume_op: OpKey,
52 abort_op: OpKey,
53 ) -> Self {
54 Self {
55 id: Ref::Handle(HandleId::fresh()),
56 kind,
57 subject,
58 input,
59 result_shape,
60 resume_op,
61 abort_op,
62 requires: Vec::new(),
63 replay_key: None,
64 }
65 }
66
67 pub fn with_id(mut self, id: Ref) -> Self {
69 self.id = id;
70 self
71 }
72
73 pub fn requiring(mut self, capability: CapabilityName) -> Self {
75 self.requires.push(capability);
76 self
77 }
78
79 pub fn with_requirements(mut self, requires: Vec<CapabilityName>) -> Self {
81 self.requires = requires;
82 self
83 }
84
85 pub fn with_replay_key(mut self, implementation: Option<Ref>) -> Result<Self> {
87 self.replay_key = Some(effect_replay_key(&self, implementation)?);
88 Ok(self)
89 }
90
91 pub fn ensure_replay_key(&mut self, implementation: Option<Ref>) -> Result<ContentId> {
93 if let Some(key) = &self.replay_key {
94 return Ok(key.clone());
95 }
96 let key = effect_replay_key(self, implementation)?;
97 self.replay_key = Some(key.clone());
98 Ok(key)
99 }
100}
101
102#[derive(Clone, Debug, PartialEq, Eq)]
104pub struct EffectRecord {
105 pub effect: Ref,
107 pub requested_event: Ref,
109 pub resolved_event: Option<Ref>,
111 pub result: Option<Ref>,
113 pub aborted: bool,
115}
116
117pub fn resolve_effect<F>(cx: &mut Cx, mut effect: Effect, perform: F) -> Result<Ref>
151where
152 F: FnOnce(&mut Cx, &Effect) -> Result<Ref>,
153{
154 let preimage = effect_replay_preimage(&effect, None);
155 let replay_key = match effect.replay_key.clone() {
156 Some(key) => key,
157 None => cx.datum_store_mut().intern(preimage)?,
158 };
159 effect.replay_key = Some(replay_key.clone());
160
161 let cassette_result = cx.with_effect_ledger(|cx, ledger| {
162 ledger.record_requested(cx.datum_store_mut(), effect.clone())?;
163 Ok(ledger.cassette_result(&replay_key).cloned())
164 })?;
165
166 if let Err(err) = cx.require_all(&effect.requires) {
167 record_effect_failure(cx, effect.id.clone(), &err)?;
168 return Err(err);
169 }
170
171 if let Some(result) = cassette_result {
172 cx.with_effect_ledger(|cx, ledger| {
173 ledger.record_resolved(cx.datum_store_mut(), effect.id.clone(), result.clone())?;
174 Ok(())
175 })?;
176 return Ok(result);
177 }
178
179 match perform(cx, &effect) {
180 Ok(result) => {
181 cx.with_effect_ledger(|cx, ledger| {
182 ledger.record_resolved(cx.datum_store_mut(), effect.id.clone(), result.clone())?;
183 Ok(())
184 })?;
185 Ok(result)
186 }
187 Err(err) => {
188 record_effect_failure(cx, effect.id, &err)?;
189 Err(err)
190 }
191 }
192}
193
194pub fn effect_replay_key(effect: &Effect, implementation: Option<Ref>) -> Result<ContentId> {
196 effect_replay_preimage(effect, implementation).content_id()
197}
198
199pub fn effect_replay_preimage(effect: &Effect, implementation: Option<Ref>) -> Datum {
201 let mut requires = effect.requires.clone();
202 requires.sort();
203 requires.dedup();
204 let mut fields = vec![
205 (
206 Symbol::new("version"),
207 Datum::String(EFFECT_REPLAY_VERSION.to_owned()),
208 ),
209 (Symbol::new("kind"), Datum::Symbol(effect.kind.clone())),
210 (Symbol::new("subject"), ref_datum(effect.subject.clone())),
211 (Symbol::new("input"), ref_datum(effect.input.clone())),
212 (
213 Symbol::new("result-shape"),
214 ref_datum(effect.result_shape.clone()),
215 ),
216 (
217 Symbol::new("resume-op"),
218 op_key_datum(effect.resume_op.clone()),
219 ),
220 (
221 Symbol::new("abort-op"),
222 op_key_datum(effect.abort_op.clone()),
223 ),
224 (
225 Symbol::new("requires"),
226 Datum::List(
227 requires
228 .into_iter()
229 .map(|capability| Datum::String(capability.as_str().to_owned()))
230 .collect(),
231 ),
232 ),
233 ];
234 if let Some(implementation) = implementation {
235 fields.push((Symbol::new("implementation"), ref_datum(implementation)));
236 }
237 Datum::Node {
238 tag: core_symbol("EffectReplayKey"),
239 fields,
240 }
241}
242
243pub fn effect_tool_call_kind() -> Symbol {
245 effect_symbol("tool-call")
246}
247
248pub fn effect_model_infer_kind() -> Symbol {
250 effect_symbol("model-infer")
251}
252
253pub fn effect_host_process_kind() -> Symbol {
255 effect_symbol("host-process")
256}
257
258pub fn effect_network_kind() -> Symbol {
260 effect_symbol("network")
261}
262
263pub fn effect_filesystem_kind() -> Symbol {
265 effect_symbol("filesystem")
266}
267
268pub fn effect_time_kind() -> Symbol {
270 effect_symbol("time")
271}
272
273pub fn effect_random_kind() -> Symbol {
275 effect_symbol("random")
276}
277
278pub fn effect_remote_realize_kind() -> Symbol {
280 effect_symbol("remote-realize")
281}
282
283pub fn effect_test_run_kind() -> Symbol {
285 effect_symbol("test-run")
286}
287
288pub fn effect_control_prompt_kind() -> Symbol {
290 effect_symbol("control-prompt")
291}
292
293pub fn effect_device_read_kind() -> Symbol {
295 effect_symbol("device-read")
296}
297
298pub fn effect_device_write_kind() -> Symbol {
300 effect_symbol("device-write")
301}
302
303pub fn effect_control_capture_kind() -> Symbol {
305 effect_symbol("control-capture")
306}
307
308pub fn effect_control_abort_kind() -> Symbol {
310 effect_symbol("control-abort")
311}
312
313pub fn effect_control_resume_kind() -> Symbol {
315 effect_symbol("control-resume")
316}
317
318pub fn effect_resume_op_key() -> OpKey {
320 OpKey::new(effect_symbol("control"), Symbol::new("resume"), 1)
321}
322
323pub fn effect_abort_op_key() -> OpKey {
325 OpKey::new(effect_symbol("control"), Symbol::new("abort"), 1)
326}
327
328fn record_effect_failure(cx: &mut Cx, effect: Ref, err: &Error) -> Result<()> {
329 let error_ref = error_ref(cx, err)?;
330 cx.with_effect_ledger(|cx, ledger| {
331 ledger.record_failed(cx.datum_store_mut(), effect, error_ref)?;
332 Ok(())
333 })
334}
335
336fn error_ref(cx: &mut Cx, err: &Error) -> Result<Ref> {
337 let id = cx
338 .datum_store_mut()
339 .intern(Datum::String(err.to_string()))?;
340 Ok(Ref::Content(id))
341}
342
343fn ref_datum(reference: Ref) -> Datum {
344 match reference {
345 Ref::Symbol(symbol) => Datum::Node {
346 tag: core_symbol("ref"),
347 fields: vec![
348 (Symbol::new("kind"), Datum::Symbol(core_symbol("symbol"))),
349 (Symbol::new("symbol"), Datum::Symbol(symbol)),
350 ],
351 },
352 Ref::Content(content) => Datum::Node {
353 tag: core_symbol("ref"),
354 fields: vec![
355 (Symbol::new("kind"), Datum::Symbol(core_symbol("content"))),
356 (Symbol::new("content"), content_id_datum(content)),
357 ],
358 },
359 Ref::Handle(handle) => Datum::Node {
360 tag: core_symbol("ref"),
361 fields: vec![
362 (Symbol::new("kind"), Datum::Symbol(core_symbol("handle"))),
363 (Symbol::new("id"), handle_id_datum(handle)),
364 ],
365 },
366 Ref::Coord(coordinate) => coordinate_datum(coordinate),
367 }
368}
369
370fn coordinate_datum(coordinate: Coordinate) -> Datum {
371 Datum::Node {
372 tag: core_symbol("ref"),
373 fields: vec![
374 (Symbol::new("kind"), Datum::Symbol(core_symbol("coord"))),
375 (Symbol::new("space"), Datum::Symbol(coordinate.space)),
376 (Symbol::new("ordinal"), content_id_datum(coordinate.ordinal)),
377 ],
378 }
379}
380
381fn content_id_datum(content: ContentId) -> Datum {
382 Datum::Node {
383 tag: core_symbol("content-id"),
384 fields: vec![
385 (Symbol::new("algorithm"), Datum::Symbol(content.algorithm)),
386 (Symbol::new("bytes"), Datum::Bytes(content.bytes.to_vec())),
387 ],
388 }
389}
390
391fn handle_id_datum(handle: HandleId) -> Datum {
392 Datum::Bytes(handle.0.to_be_bytes().to_vec())
393}
394
395fn op_key_datum(op: OpKey) -> Datum {
396 Datum::Node {
397 tag: core_symbol("op-key"),
398 fields: vec![
399 (Symbol::new("namespace"), Datum::Symbol(op.namespace)),
400 (Symbol::new("name"), Datum::Symbol(op.name)),
401 (
402 Symbol::new("version"),
403 Datum::Number(NumberLiteral {
404 domain: core_symbol("u16"),
405 canonical: op.version.to_string(),
406 }),
407 ),
408 ],
409 }
410}
411
412fn effect_symbol(name: &str) -> Symbol {
413 Symbol::qualified("effect", name)
414}
415
416fn core_symbol(name: &str) -> Symbol {
417 Symbol::qualified("core", name)
418}
419
420#[cfg(test)]
421mod tests {
422 use std::sync::{
423 Arc,
424 atomic::{AtomicUsize, Ordering},
425 };
426
427 use super::*;
428 use crate::EventKind;
429
430 use crate::testing::bare_cx as cx;
431
432 fn effect(input: Ref) -> Effect {
433 Effect::new(
434 effect_tool_call_kind(),
435 Ref::Symbol(Symbol::qualified("test", "tool")),
436 input,
437 Ref::Symbol(core_symbol("Any")),
438 effect_resume_op_key(),
439 effect_abort_op_key(),
440 )
441 }
442
443 #[test]
444 fn same_replay_preimage_gives_same_key() {
445 let left = effect(Ref::Symbol(Symbol::qualified("test", "input")));
446 let right = effect(Ref::Symbol(Symbol::qualified("test", "input")));
447
448 assert_eq!(
449 effect_replay_key(&left, None).unwrap(),
450 effect_replay_key(&right, None).unwrap()
451 );
452 }
453
454 #[test]
455 fn changed_input_gives_different_key() {
456 let left = effect(Ref::Symbol(Symbol::qualified("test", "left")));
457 let right = effect(Ref::Symbol(Symbol::qualified("test", "right")));
458
459 assert_ne!(
460 effect_replay_key(&left, None).unwrap(),
461 effect_replay_key(&right, None).unwrap()
462 );
463 }
464
465 #[test]
466 fn resolving_effect_emits_requested_and_resolved_events() {
467 let mut cx = cx();
468 let result = Ref::Symbol(Symbol::qualified("test", "result"));
469
470 let actual = resolve_effect(&mut cx, effect(Ref::Symbol(Symbol::new("input"))), {
471 let result = result.clone();
472 move |_cx, _effect| Ok(result)
473 })
474 .unwrap();
475
476 assert_eq!(actual, result);
477 let records = cx.effect_ledger().records();
478 assert_eq!(records.len(), 1);
479 assert_eq!(records[0].result, Some(result.clone()));
480 let events = cx.effect_ledger().events_for_run();
481 assert!(matches!(events[0].kind, EventKind::EffectRequested { .. }));
482 assert!(matches!(events[1].kind, EventKind::EffectResolved { .. }));
483 }
484
485 #[test]
486 fn missing_capability_denies_effect_before_performer_runs() {
487 let mut cx = cx();
488 let calls = Arc::new(AtomicUsize::new(0));
489 let err = resolve_effect(
490 &mut cx,
491 effect(Ref::Symbol(Symbol::new("input")))
492 .requiring(CapabilityName::new("test.required")),
493 {
494 let calls = calls.clone();
495 move |_cx, _effect| {
496 calls.fetch_add(1, Ordering::SeqCst);
497 Ok(Ref::Symbol(Symbol::new("unreachable")))
498 }
499 },
500 )
501 .unwrap_err();
502
503 assert!(
504 matches!(err, Error::CapabilityDenied { capability } if capability.as_str() == "test.required")
505 );
506 assert_eq!(calls.load(Ordering::SeqCst), 0);
507 assert!(cx.effect_ledger().records()[0].aborted);
508 }
509
510 #[test]
511 fn cassette_result_is_used_when_replay_key_matches() {
512 let mut cx = cx();
513 let mut effect = effect(Ref::Symbol(Symbol::new("input")));
514 let key = effect.ensure_replay_key(None).unwrap();
515 let cassette = Ref::Symbol(Symbol::qualified("test", "cassette-result"));
516 cx.effect_ledger_mut()
517 .insert_cassette_result(key, cassette.clone());
518 let calls = Arc::new(AtomicUsize::new(0));
519
520 let actual = resolve_effect(&mut cx, effect, {
521 let calls = calls.clone();
522 move |_cx, _effect| {
523 calls.fetch_add(1, Ordering::SeqCst);
524 Ok(Ref::Symbol(Symbol::new("performed")))
525 }
526 })
527 .unwrap();
528
529 assert_eq!(actual, cassette);
530 assert_eq!(calls.load(Ordering::SeqCst), 0);
531 }
532}