Skip to main content

relux_runtime/effect/
mod.rs

1pub mod registry;
2
3use std::collections::HashMap;
4use std::collections::HashSet;
5use std::sync::Arc;
6
7use tokio::sync::Mutex as TokioMutex;
8use tokio_util::sync::CancellationToken;
9
10use futures::future::join_all;
11
12use crate::RuntimeContext;
13use crate::effect::registry::EffectHandle;
14use crate::effect::registry::EffectInstanceKey;
15use crate::effect::registry::EffectRegistry;
16use crate::effect::registry::EffectSlot;
17use crate::report::result::Failure;
18use crate::vm::Vm;
19use crate::vm::context::ExecutionContext;
20use crate::vm::context::Scope;
21use crate::vm::context::ShellState;
22use relux_core::pure::Env;
23use relux_core::pure::LayeredEnv;
24use relux_core::pure::VarScope;
25use relux_ir::IrCleanupBlock;
26use relux_ir::IrEffectItem;
27use relux_ir::IrEffectStart;
28use relux_ir::IrPureLetStmt;
29
30// ─── Warning / CleanupSource ────────────────────────────────
31
32#[derive(Debug, Clone)]
33pub enum CleanupSource {
34    Test,
35    Effect { name: String },
36}
37
38#[derive(Debug, Clone)]
39pub enum Warning {
40    CleanupFailed {
41        source: CleanupSource,
42        failure: Failure,
43    },
44}
45
46// ─── EffectManager ──────────────────────────────────────────
47
48#[derive(Clone)]
49pub struct EffectManager {
50    registry: Arc<EffectRegistry>,
51    pub(crate) rt_ctx: RuntimeContext,
52}
53
54impl EffectManager {
55    pub fn new(registry: Arc<EffectRegistry>, rt_ctx: RuntimeContext) -> Self {
56        Self { registry, rt_ctx }
57    }
58
59    /// Acquire all starts. Each start recursively acquires its own
60    /// dependencies before bootstrapping itself.
61    /// `caller_vars` contains the caller's accumulated variable scope,
62    /// allowing overlay expressions to reference the caller's `let` bindings.
63    /// `caller_env` is the layered environment visible to the caller.
64    /// Returns (key, exported-shells-map) per start declaration.
65    #[allow(clippy::type_complexity)]
66    pub fn instantiate<'a>(
67        &'a self,
68        starts: &'a [IrEffectStart],
69        caller_vars: &'a VarScope,
70        caller_env: &'a Arc<LayeredEnv>,
71    ) -> std::pin::Pin<
72        Box<
73            dyn std::future::Future<
74                    Output = Result<
75                        Vec<(EffectInstanceKey, HashMap<String, Arc<TokioMutex<Vm>>>)>,
76                        Failure,
77                    >,
78                > + Send
79                + 'a,
80        >,
81    > {
82        Box::pin(async move {
83            let mut results = Vec::with_capacity(starts.len());
84            for start in starts {
85                // Evaluate overlay first to build runtime identity key
86                let evaluated = self.eval_overlay(start, caller_vars, caller_env).await?;
87
88                // Look up effect's expect names for identity key
89                let expect_names: Vec<&str> = self
90                    .rt_ctx
91                    .tables
92                    .effects
93                    .get(start.effect())
94                    .and_then(|r| r.as_ref().ok())
95                    .map(|eff| eff.expects().iter().map(|e| e.name()).collect())
96                    .unwrap_or_default();
97                let key = EffectInstanceKey::from_expects(
98                    start.effect().clone(),
99                    &expect_names,
100                    &evaluated,
101                );
102
103                let shells = self
104                    .acquire(&key, start, caller_vars, caller_env, evaluated)
105                    .await?;
106                results.push((key, shells));
107            }
108            Ok(results)
109        })
110    }
111
112    /// Release all effects acquired during this test run.
113    /// Runs one `run_cleanup` per acquisition (matching the symmetric `acquire` calls),
114    /// concurrently. The slot mutex serializes access and refcount ensures the last
115    /// releaser triggers actual teardown + recursive dependency cleanup.
116    pub async fn cleanup_all(&self) -> Vec<Warning> {
117        let keys = self.registry.acquired_keys();
118        let futures: Vec<_> = keys.iter().map(|key| self.run_cleanup(key)).collect();
119        let results = join_all(futures).await;
120        results.into_iter().flatten().collect()
121    }
122
123    async fn acquire(
124        &self,
125        key: &EffectInstanceKey,
126        start: &IrEffectStart,
127        caller_vars: &VarScope,
128        caller_env: &Arc<LayeredEnv>,
129        evaluated_overlay: Env,
130    ) -> Result<HashMap<String, Arc<TokioMutex<Vm>>>, Failure> {
131        let slot = self.registry.slot(key);
132        let mut guard = slot.lock().await;
133
134        let result = match &mut *guard {
135            EffectSlot::Ready { refcount, handle } => {
136                *refcount += 1;
137                Ok(handle.exposed_shells())
138            }
139            EffectSlot::Failed(failure) => Err(failure.clone()),
140            EffectSlot::Empty => match self
141                .bootstrap_effect(start, caller_vars, caller_env, evaluated_overlay)
142                .await
143            {
144                Ok(handle) => {
145                    let exposed = handle.exposed_shells();
146                    *guard = EffectSlot::Ready {
147                        refcount: 1,
148                        handle,
149                    };
150                    Ok(exposed)
151                }
152                Err(failure) => {
153                    self.rt_ctx.events.emit_error("", failure.summary());
154                    *guard = EffectSlot::Failed(failure.clone());
155                    Err(failure)
156                }
157            },
158        };
159        if result.is_ok() {
160            self.registry.record_acquisition(key.clone());
161        }
162        result
163    }
164
165    async fn bootstrap_effect(
166        &self,
167        start: &IrEffectStart,
168        _caller_vars: &VarScope,
169        caller_env: &Arc<LayeredEnv>,
170        evaluated_overlay: Env,
171    ) -> Result<EffectHandle, Failure> {
172        let effect_name = start.effect().to_string();
173        self.rt_ctx.events.emit_effect_setup("", &effect_name);
174
175        let effect_result = self
176            .rt_ctx
177            .tables
178            .effects
179            .get(start.effect())
180            .ok_or_else(|| Failure::Runtime {
181                message: format!("effect {:?} not found in table", start.effect()),
182                span: None,
183                shell: None,
184            })?;
185        let effect = effect_result.as_ref().map_err(|e| Failure::Runtime {
186            message: format!("effect resolution failed: {e:?}"),
187            span: None,
188            shell: None,
189        })?;
190
191        // 1. Create layered env from pre-evaluated overlay (inherits caller's env)
192        let effect_env = Arc::new(LayeredEnv::child(caller_env.clone(), evaluated_overlay));
193
194        // 2. Create effect scope
195        let scope = Scope::Effect {
196            name: effect.name().name().to_string(),
197            vars: Arc::new(TokioMutex::new(VarScope::new())),
198            _timeout: None,
199            env: effect_env.clone(),
200        };
201
202        // 3. Evaluate effect-level lets into scope (parser enforces lets before starts)
203        for item in effect.body() {
204            if let IrEffectItem::Let { stmt, .. } = item {
205                self.eval_effect_let(stmt, &scope, &effect_env).await;
206            }
207        }
208
209        // 4. Recursively instantiate sub-dependencies (effect's vars available to sub-overlays)
210        let effect_vars = scope.vars().lock().await.clone();
211        let exported_deps = self
212            .instantiate(effect.starts(), &effect_vars, &effect_env)
213            .await?;
214
215        // 5. Build dependency shells map (alias → exported shells) and collect dep keys
216        let mut dep_shells: HashMap<String, HashMap<String, Arc<TokioMutex<Vm>>>> = HashMap::new();
217        let mut dep_keys: Vec<EffectInstanceKey> = Vec::new();
218        for (sub_start, (dep_key, exported)) in effect.starts().iter().zip(exported_deps) {
219            dep_keys.push(dep_key);
220            if let Some(alias) = sub_start.alias() {
221                dep_shells.insert(alias.to_string(), exported);
222            }
223        }
224
225        // 5b. Reset imported VMs
226        let mut reset_seen = HashSet::new();
227        for shells_map in dep_shells.values() {
228            for vm_arc in shells_map.values() {
229                let ptr = Arc::as_ptr(vm_arc) as usize;
230                if reset_seen.insert(ptr) {
231                    vm_arc.lock().await.reset_for_export(scope.clone());
232                }
233            }
234        }
235
236        // Build local shells map, pre-populated with aliased dependency shells.
237        // When a dependency is aliased (e.g. `start SetupDb as db`), its exported
238        // shells are accessible by alias in the effect body (`shell db { ... }`
239        // reuses the dependency's shell).
240        let mut shells: HashMap<String, Arc<TokioMutex<Vm>>> = HashMap::new();
241        for (alias, dep_exported) in &dep_shells {
242            if dep_exported.len() == 1 {
243                let vm_arc = dep_exported.values().next().unwrap().clone();
244                self.rt_ctx
245                    .events
246                    .emit_shell_alias(alias, vm_arc.lock().await.current_name());
247                shells.insert(alias.clone(), vm_arc);
248            }
249        }
250
251        // 6. Walk IrEffectItems (lets already evaluated, starts already instantiated)
252        let mut cleanup_block = None;
253        for item in effect.body() {
254            match item {
255                IrEffectItem::Comment { .. }
256                | IrEffectItem::Expect { .. }
257                | IrEffectItem::Start { .. }
258                | IrEffectItem::Expose { .. }
259                | IrEffectItem::Let { .. } => continue,
260                IrEffectItem::Shell { block, .. } => {
261                    if let Some(qualifier) = block.qualifier() {
262                        // Qualified: alias.shell { ... }
263                        let alias = qualifier.name();
264                        let shell_name = block.name().name();
265                        let display = format!("{alias}.{shell_name}");
266                        self.rt_ctx.events.emit_shell_switch(&display);
267                        let dep = dep_shells.get(alias).ok_or_else(|| Failure::Runtime {
268                            message: format!("unknown effect alias `{alias}`"),
269                            span: None,
270                            shell: None,
271                        })?;
272                        let vm_arc = dep.get(shell_name).ok_or_else(|| Failure::Runtime {
273                            message: format!(
274                                "effect alias `{alias}` does not expose shell `{shell_name}`"
275                            ),
276                            span: None,
277                            shell: None,
278                        })?;
279                        let mut vm = vm_arc.lock().await;
280                        self.rt_ctx.events.emit_shell_switch(vm.current_name());
281                        vm.exec_stmts(block.body()).await?;
282                    } else {
283                        // Unqualified: shell name { ... }
284                        let name = block.name().name().to_string();
285                        self.rt_ctx.events.emit_shell_switch(&name);
286                        if !shells.contains_key(&name) {
287                            let shell_state = ShellState::new(name.clone(), None);
288                            let ctx = ExecutionContext::new(
289                                scope.clone(),
290                                shell_state,
291                                self.rt_ctx.shell.default_timeout.clone(),
292                                self.rt_ctx.env.clone(),
293                            );
294                            let vm = Vm::new(name.clone(), ctx, &self.rt_ctx).await?;
295                            shells.insert(name.clone(), Arc::new(TokioMutex::new(vm)));
296                        }
297                        let vm_arc = shells.get(&name).expect("shell just inserted above");
298                        let mut vm = vm_arc.lock().await;
299                        let display_name = vm.current_name().to_string();
300                        self.rt_ctx.events.emit_shell_switch(&display_name);
301                        vm.exec_stmts(block.body()).await?;
302                    }
303                }
304                IrEffectItem::Cleanup { block, .. } => {
305                    cleanup_block = Some(block.clone());
306                }
307            }
308        }
309
310        // 7. Resolve expose declarations — mark which shells are exposed
311        let mut exposed: HashSet<String> = HashSet::new();
312        for expose in effect.exposes() {
313            let exposed_name = expose.exposed_name().to_string();
314            if let Some(qualifier) = expose.qualifier() {
315                // Qualified: `expose alias.shell [as name]` — from dependency
316                let dep = dep_shells.get(qualifier).ok_or_else(|| Failure::Runtime {
317                    message: format!(
318                        "effect `{}` expose references unknown alias `{}`",
319                        effect.name().name(),
320                        qualifier,
321                    ),
322                    span: None,
323                    shell: None,
324                })?;
325                let vm_arc = dep.get(expose.shell()).ok_or_else(|| Failure::Runtime {
326                    message: format!(
327                        "effect `{}` expose references shell `{}` not exposed by `{}`",
328                        effect.name().name(),
329                        expose.shell(),
330                        qualifier,
331                    ),
332                    span: None,
333                    shell: None,
334                })?;
335                shells.insert(exposed_name.clone(), vm_arc.clone());
336                exposed.insert(exposed_name);
337            } else {
338                // Simple: `expose shell [as name]` — local shell
339                if !shells.contains_key(expose.shell()) {
340                    return Err(Failure::Runtime {
341                        message: format!(
342                            "effect `{}` expose references unknown shell `{}`",
343                            effect.name().name(),
344                            expose.shell(),
345                        ),
346                        span: None,
347                        shell: None,
348                    });
349                }
350                if exposed_name != expose.shell() {
351                    // Aliased expose: insert under the exposed name too
352                    let vm_arc = shells.get(expose.shell()).unwrap().clone();
353                    shells.insert(exposed_name.clone(), vm_arc);
354                }
355                exposed.insert(exposed_name);
356            }
357        }
358
359        // If no expose declarations, fall back to exposing the first local shell
360        // (backwards compatibility for effects without explicit expose)
361        if exposed.is_empty()
362            && !shells.is_empty()
363            && let Some(first_name) = effect.body().iter().find_map(|item| {
364                if let IrEffectItem::Shell { block, .. } = item {
365                    Some(block.name().name().to_string())
366                } else {
367                    None
368                }
369            })
370            && shells.contains_key(&first_name)
371        {
372            exposed.insert(first_name);
373        }
374
375        // 8. Terminate non-exposed local shells (deduplicate by Arc pointer).
376        //    Collect pointers of exposed VMs first — a non-exposed key may alias
377        //    the same Arc as an exposed key (e.g. backwards-compat single-shell alias),
378        //    so we must not shut those down.
379        let exposed_ptrs: HashSet<usize> = shells
380            .iter()
381            .filter(|(k, _)| exposed.contains(k.as_str()))
382            .map(|(_, v)| Arc::as_ptr(v) as usize)
383            .collect();
384        let non_exposed_keys: Vec<String> = shells
385            .keys()
386            .filter(|k| !exposed.contains(k.as_str()))
387            .cloned()
388            .collect();
389        for key in non_exposed_keys {
390            if let Some(vm_arc) = shells.remove(&key) {
391                let ptr = Arc::as_ptr(&vm_arc) as usize;
392                if !exposed_ptrs.contains(&ptr) {
393                    vm_arc.lock().await.shutdown().await;
394                }
395            }
396        }
397
398        Ok(EffectHandle {
399            scope,
400            shells,
401            exposed,
402            dependencies: dep_keys,
403            cleanup: cleanup_block,
404        })
405    }
406
407    async fn eval_overlay(
408        &self,
409        start: &IrEffectStart,
410        caller_vars: &VarScope,
411        caller_env: &Arc<LayeredEnv>,
412    ) -> Result<Env, Failure> {
413        let mut overlay = Env::new();
414        for entry in start.overlay() {
415            let value = relux_ir::evaluator::eval_pure_expr(
416                entry.value(),
417                caller_vars,
418                caller_env,
419                &self.rt_ctx.tables.pure_fns,
420            );
421            overlay.insert(entry.key().name().to_string(), value);
422        }
423        Ok(overlay)
424    }
425
426    async fn eval_effect_let(
427        &self,
428        stmt: &IrPureLetStmt,
429        scope: &Scope,
430        effect_env: &Arc<LayeredEnv>,
431    ) {
432        let mut vars = scope.vars().lock().await;
433        let value = if let Some(expr) = stmt.value() {
434            relux_ir::evaluator::eval_pure_expr(
435                expr,
436                &vars,
437                effect_env,
438                &self.rt_ctx.tables.pure_fns,
439            )
440        } else {
441            String::new()
442        };
443        vars.insert(stmt.name().name().to_string(), value);
444    }
445
446    fn run_cleanup<'a>(
447        &'a self,
448        key: &'a EffectInstanceKey,
449    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<Warning>> + Send + 'a>> {
450        Box::pin(async move { self.run_cleanup_inner(key).await })
451    }
452
453    async fn run_cleanup_inner(&self, key: &EffectInstanceKey) -> Vec<Warning> {
454        let slot = self.registry.slot(key);
455        let mut guard = slot.lock().await;
456        let mut warnings = Vec::new();
457
458        match &mut *guard {
459            EffectSlot::Ready { refcount, handle } => {
460                *refcount -= 1;
461
462                if *refcount == 0 {
463                    let effect_name = handle.scope.name().to_string();
464
465                    self.rt_ctx.events.emit_effect_teardown("", &effect_name);
466
467                    // 1. Shut down all VMs (exposed and non-exposed, deduplicated)
468                    let mut seen = HashSet::new();
469                    for vm_arc in handle.shells.values() {
470                        let ptr = Arc::as_ptr(vm_arc) as usize;
471                        if seen.insert(ptr) {
472                            vm_arc.lock().await.shutdown().await;
473                        }
474                    }
475
476                    // 2. Run cleanup block in fresh shell (best-effort)
477                    if let Some(cleanup_block) = &handle.cleanup {
478                        self.rt_ctx.events.emit_cleanup("__cleanup");
479                        let cleanup_result =
480                            self.run_cleanup_block(cleanup_block, &handle.scope).await;
481                        if let Err(failure) = cleanup_result {
482                            self.rt_ctx.events.emit_warning(
483                                "__cleanup",
484                                format!("effect {effect_name} cleanup failed"),
485                            );
486                            warnings.push(Warning::CleanupFailed {
487                                source: CleanupSource::Effect { name: effect_name },
488                                failure,
489                            });
490                        }
491                    }
492
493                    let deps = handle.dependencies.clone();
494                    *guard = EffectSlot::Empty;
495                    drop(guard);
496
497                    // 3. Recursively release dependencies
498                    for dep in &deps {
499                        warnings.extend(self.run_cleanup(dep).await);
500                    }
501                }
502            }
503            EffectSlot::Failed(_) => {
504                // nothing to clean up
505            }
506            EffectSlot::Empty => {
507                // Should not happen in normal use, but don't panic
508            }
509        }
510
511        warnings
512    }
513
514    async fn run_cleanup_block(
515        &self,
516        cleanup_block: &IrCleanupBlock,
517        scope: &Scope,
518    ) -> Result<(), Failure> {
519        let shell_state = ShellState::new("__cleanup".to_string(), None);
520        let ctx = ExecutionContext::new(
521            scope.clone(),
522            shell_state,
523            self.rt_ctx.shell.default_timeout.clone(),
524            self.rt_ctx.env.clone(),
525        );
526        // Cleanup uses its own uncancellable token
527        let mut cleanup_rt_ctx = self.rt_ctx.clone();
528        cleanup_rt_ctx.cancel = CancellationToken::new();
529        let mut vm = Vm::new("__cleanup".to_string(), ctx, &cleanup_rt_ctx).await?;
530        vm.exec_stmts(cleanup_block.body()).await?;
531        vm.shutdown().await;
532        Ok(())
533    }
534}