1pub mod registry;
2
3use std::collections::HashMap;
4use std::collections::HashSet;
5use std::sync::Arc;
6
7use tokio::sync::Mutex as TokioMutex;
8use tokio_util::sync::CancellationToken;
9
10use futures::future::join_all;
11
12use crate::RuntimeContext;
13use crate::effect::registry::EffectHandle;
14use crate::effect::registry::EffectInstanceKey;
15use crate::effect::registry::EffectRegistry;
16use crate::effect::registry::EffectSlot;
17use crate::report::result::Failure;
18use crate::vm::Vm;
19use crate::vm::context::ExecutionContext;
20use crate::vm::context::Scope;
21use crate::vm::context::ShellState;
22use relux_core::pure::Env;
23use relux_core::pure::LayeredEnv;
24use relux_core::pure::VarScope;
25use relux_ir::IrCleanupBlock;
26use relux_ir::IrEffectItem;
27use relux_ir::IrEffectStart;
28use relux_ir::IrPureLetStmt;
29
30#[derive(Debug, Clone)]
33pub enum CleanupSource {
34 Test,
35 Effect { name: String },
36}
37
38#[derive(Debug, Clone)]
39pub enum Warning {
40 CleanupFailed {
41 source: CleanupSource,
42 failure: Failure,
43 },
44}
45
46#[derive(Clone)]
49pub struct EffectManager {
50 registry: Arc<EffectRegistry>,
51 pub(crate) rt_ctx: RuntimeContext,
52}
53
54impl EffectManager {
55 pub fn new(registry: Arc<EffectRegistry>, rt_ctx: RuntimeContext) -> Self {
56 Self { registry, rt_ctx }
57 }
58
59 #[allow(clippy::type_complexity)]
66 pub fn instantiate<'a>(
67 &'a self,
68 starts: &'a [IrEffectStart],
69 caller_vars: &'a VarScope,
70 caller_env: &'a Arc<LayeredEnv>,
71 ) -> std::pin::Pin<
72 Box<
73 dyn std::future::Future<
74 Output = Result<
75 Vec<(EffectInstanceKey, HashMap<String, Arc<TokioMutex<Vm>>>)>,
76 Failure,
77 >,
78 > + Send
79 + 'a,
80 >,
81 > {
82 Box::pin(async move {
83 let mut results = Vec::with_capacity(starts.len());
84 for start in starts {
85 let evaluated = self.eval_overlay(start, caller_vars, caller_env).await?;
87
88 let expect_names: Vec<&str> = self
90 .rt_ctx
91 .tables
92 .effects
93 .get(start.effect())
94 .and_then(|r| r.as_ref().ok())
95 .map(|eff| eff.expects().iter().map(|e| e.name()).collect())
96 .unwrap_or_default();
97 let key = EffectInstanceKey::from_expects(
98 start.effect().clone(),
99 &expect_names,
100 &evaluated,
101 );
102
103 let shells = self
104 .acquire(&key, start, caller_vars, caller_env, evaluated)
105 .await?;
106 results.push((key, shells));
107 }
108 Ok(results)
109 })
110 }
111
112 pub async fn cleanup_all(&self) -> Vec<Warning> {
117 let keys = self.registry.acquired_keys();
118 let futures: Vec<_> = keys.iter().map(|key| self.run_cleanup(key)).collect();
119 let results = join_all(futures).await;
120 results.into_iter().flatten().collect()
121 }
122
123 async fn acquire(
124 &self,
125 key: &EffectInstanceKey,
126 start: &IrEffectStart,
127 caller_vars: &VarScope,
128 caller_env: &Arc<LayeredEnv>,
129 evaluated_overlay: Env,
130 ) -> Result<HashMap<String, Arc<TokioMutex<Vm>>>, Failure> {
131 let slot = self.registry.slot(key);
132 let mut guard = slot.lock().await;
133
134 let result = match &mut *guard {
135 EffectSlot::Ready { refcount, handle } => {
136 *refcount += 1;
137 Ok(handle.exposed_shells())
138 }
139 EffectSlot::Failed(failure) => Err(failure.clone()),
140 EffectSlot::Empty => match self
141 .bootstrap_effect(start, caller_vars, caller_env, evaluated_overlay)
142 .await
143 {
144 Ok(handle) => {
145 let exposed = handle.exposed_shells();
146 *guard = EffectSlot::Ready {
147 refcount: 1,
148 handle,
149 };
150 Ok(exposed)
151 }
152 Err(failure) => {
153 self.rt_ctx.events.emit_error("", failure.summary());
154 *guard = EffectSlot::Failed(failure.clone());
155 Err(failure)
156 }
157 },
158 };
159 if result.is_ok() {
160 self.registry.record_acquisition(key.clone());
161 }
162 result
163 }
164
165 async fn bootstrap_effect(
166 &self,
167 start: &IrEffectStart,
168 _caller_vars: &VarScope,
169 caller_env: &Arc<LayeredEnv>,
170 evaluated_overlay: Env,
171 ) -> Result<EffectHandle, Failure> {
172 let effect_name = start.effect().to_string();
173 self.rt_ctx.events.emit_effect_setup("", &effect_name);
174
175 let effect_result = self
176 .rt_ctx
177 .tables
178 .effects
179 .get(start.effect())
180 .ok_or_else(|| Failure::Runtime {
181 message: format!("effect {:?} not found in table", start.effect()),
182 span: None,
183 shell: None,
184 })?;
185 let effect = effect_result.as_ref().map_err(|e| Failure::Runtime {
186 message: format!("effect resolution failed: {e:?}"),
187 span: None,
188 shell: None,
189 })?;
190
191 let effect_env = Arc::new(LayeredEnv::child(caller_env.clone(), evaluated_overlay));
193
194 let scope = Scope::Effect {
196 name: effect.name().name().to_string(),
197 vars: Arc::new(TokioMutex::new(VarScope::new())),
198 _timeout: None,
199 env: effect_env.clone(),
200 };
201
202 for item in effect.body() {
204 if let IrEffectItem::Let { stmt, .. } = item {
205 self.eval_effect_let(stmt, &scope, &effect_env).await;
206 }
207 }
208
209 let effect_vars = scope.vars().lock().await.clone();
211 let exported_deps = self
212 .instantiate(effect.starts(), &effect_vars, &effect_env)
213 .await?;
214
215 let mut dep_shells: HashMap<String, HashMap<String, Arc<TokioMutex<Vm>>>> = HashMap::new();
217 let mut dep_keys: Vec<EffectInstanceKey> = Vec::new();
218 for (sub_start, (dep_key, exported)) in effect.starts().iter().zip(exported_deps) {
219 dep_keys.push(dep_key);
220 if let Some(alias) = sub_start.alias() {
221 dep_shells.insert(alias.to_string(), exported);
222 }
223 }
224
225 let mut reset_seen = HashSet::new();
227 for shells_map in dep_shells.values() {
228 for vm_arc in shells_map.values() {
229 let ptr = Arc::as_ptr(vm_arc) as usize;
230 if reset_seen.insert(ptr) {
231 vm_arc.lock().await.reset_for_export(scope.clone());
232 }
233 }
234 }
235
236 let mut shells: HashMap<String, Arc<TokioMutex<Vm>>> = HashMap::new();
241 for (alias, dep_exported) in &dep_shells {
242 if dep_exported.len() == 1 {
243 let vm_arc = dep_exported.values().next().unwrap().clone();
244 self.rt_ctx
245 .events
246 .emit_shell_alias(alias, vm_arc.lock().await.current_name());
247 shells.insert(alias.clone(), vm_arc);
248 }
249 }
250
251 let mut cleanup_block = None;
253 for item in effect.body() {
254 match item {
255 IrEffectItem::Comment { .. }
256 | IrEffectItem::Expect { .. }
257 | IrEffectItem::Start { .. }
258 | IrEffectItem::Expose { .. }
259 | IrEffectItem::Let { .. } => continue,
260 IrEffectItem::Shell { block, .. } => {
261 if let Some(qualifier) = block.qualifier() {
262 let alias = qualifier.name();
264 let shell_name = block.name().name();
265 let display = format!("{alias}.{shell_name}");
266 self.rt_ctx.events.emit_shell_switch(&display);
267 let dep = dep_shells.get(alias).ok_or_else(|| Failure::Runtime {
268 message: format!("unknown effect alias `{alias}`"),
269 span: None,
270 shell: None,
271 })?;
272 let vm_arc = dep.get(shell_name).ok_or_else(|| Failure::Runtime {
273 message: format!(
274 "effect alias `{alias}` does not expose shell `{shell_name}`"
275 ),
276 span: None,
277 shell: None,
278 })?;
279 let mut vm = vm_arc.lock().await;
280 self.rt_ctx.events.emit_shell_switch(vm.current_name());
281 vm.exec_stmts(block.body()).await?;
282 } else {
283 let name = block.name().name().to_string();
285 self.rt_ctx.events.emit_shell_switch(&name);
286 if !shells.contains_key(&name) {
287 let shell_state = ShellState::new(name.clone(), None);
288 let ctx = ExecutionContext::new(
289 scope.clone(),
290 shell_state,
291 self.rt_ctx.shell.default_timeout.clone(),
292 self.rt_ctx.env.clone(),
293 );
294 let vm = Vm::new(name.clone(), ctx, &self.rt_ctx).await?;
295 shells.insert(name.clone(), Arc::new(TokioMutex::new(vm)));
296 }
297 let vm_arc = shells.get(&name).expect("shell just inserted above");
298 let mut vm = vm_arc.lock().await;
299 let display_name = vm.current_name().to_string();
300 self.rt_ctx.events.emit_shell_switch(&display_name);
301 vm.exec_stmts(block.body()).await?;
302 }
303 }
304 IrEffectItem::Cleanup { block, .. } => {
305 cleanup_block = Some(block.clone());
306 }
307 }
308 }
309
310 let mut exposed: HashSet<String> = HashSet::new();
312 for expose in effect.exposes() {
313 let exposed_name = expose.exposed_name().to_string();
314 if let Some(qualifier) = expose.qualifier() {
315 let dep = dep_shells.get(qualifier).ok_or_else(|| Failure::Runtime {
317 message: format!(
318 "effect `{}` expose references unknown alias `{}`",
319 effect.name().name(),
320 qualifier,
321 ),
322 span: None,
323 shell: None,
324 })?;
325 let vm_arc = dep.get(expose.shell()).ok_or_else(|| Failure::Runtime {
326 message: format!(
327 "effect `{}` expose references shell `{}` not exposed by `{}`",
328 effect.name().name(),
329 expose.shell(),
330 qualifier,
331 ),
332 span: None,
333 shell: None,
334 })?;
335 shells.insert(exposed_name.clone(), vm_arc.clone());
336 exposed.insert(exposed_name);
337 } else {
338 if !shells.contains_key(expose.shell()) {
340 return Err(Failure::Runtime {
341 message: format!(
342 "effect `{}` expose references unknown shell `{}`",
343 effect.name().name(),
344 expose.shell(),
345 ),
346 span: None,
347 shell: None,
348 });
349 }
350 if exposed_name != expose.shell() {
351 let vm_arc = shells.get(expose.shell()).unwrap().clone();
353 shells.insert(exposed_name.clone(), vm_arc);
354 }
355 exposed.insert(exposed_name);
356 }
357 }
358
359 if exposed.is_empty()
362 && !shells.is_empty()
363 && let Some(first_name) = effect.body().iter().find_map(|item| {
364 if let IrEffectItem::Shell { block, .. } = item {
365 Some(block.name().name().to_string())
366 } else {
367 None
368 }
369 })
370 && shells.contains_key(&first_name)
371 {
372 exposed.insert(first_name);
373 }
374
375 let exposed_ptrs: HashSet<usize> = shells
380 .iter()
381 .filter(|(k, _)| exposed.contains(k.as_str()))
382 .map(|(_, v)| Arc::as_ptr(v) as usize)
383 .collect();
384 let non_exposed_keys: Vec<String> = shells
385 .keys()
386 .filter(|k| !exposed.contains(k.as_str()))
387 .cloned()
388 .collect();
389 for key in non_exposed_keys {
390 if let Some(vm_arc) = shells.remove(&key) {
391 let ptr = Arc::as_ptr(&vm_arc) as usize;
392 if !exposed_ptrs.contains(&ptr) {
393 vm_arc.lock().await.shutdown().await;
394 }
395 }
396 }
397
398 Ok(EffectHandle {
399 scope,
400 shells,
401 exposed,
402 dependencies: dep_keys,
403 cleanup: cleanup_block,
404 })
405 }
406
407 async fn eval_overlay(
408 &self,
409 start: &IrEffectStart,
410 caller_vars: &VarScope,
411 caller_env: &Arc<LayeredEnv>,
412 ) -> Result<Env, Failure> {
413 let mut overlay = Env::new();
414 for entry in start.overlay() {
415 let value = relux_ir::evaluator::eval_pure_expr(
416 entry.value(),
417 caller_vars,
418 caller_env,
419 &self.rt_ctx.tables.pure_fns,
420 );
421 overlay.insert(entry.key().name().to_string(), value);
422 }
423 Ok(overlay)
424 }
425
426 async fn eval_effect_let(
427 &self,
428 stmt: &IrPureLetStmt,
429 scope: &Scope,
430 effect_env: &Arc<LayeredEnv>,
431 ) {
432 let mut vars = scope.vars().lock().await;
433 let value = if let Some(expr) = stmt.value() {
434 relux_ir::evaluator::eval_pure_expr(
435 expr,
436 &vars,
437 effect_env,
438 &self.rt_ctx.tables.pure_fns,
439 )
440 } else {
441 String::new()
442 };
443 vars.insert(stmt.name().name().to_string(), value);
444 }
445
446 fn run_cleanup<'a>(
447 &'a self,
448 key: &'a EffectInstanceKey,
449 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<Warning>> + Send + 'a>> {
450 Box::pin(async move { self.run_cleanup_inner(key).await })
451 }
452
453 async fn run_cleanup_inner(&self, key: &EffectInstanceKey) -> Vec<Warning> {
454 let slot = self.registry.slot(key);
455 let mut guard = slot.lock().await;
456 let mut warnings = Vec::new();
457
458 match &mut *guard {
459 EffectSlot::Ready { refcount, handle } => {
460 *refcount -= 1;
461
462 if *refcount == 0 {
463 let effect_name = handle.scope.name().to_string();
464
465 self.rt_ctx.events.emit_effect_teardown("", &effect_name);
466
467 let mut seen = HashSet::new();
469 for vm_arc in handle.shells.values() {
470 let ptr = Arc::as_ptr(vm_arc) as usize;
471 if seen.insert(ptr) {
472 vm_arc.lock().await.shutdown().await;
473 }
474 }
475
476 if let Some(cleanup_block) = &handle.cleanup {
478 self.rt_ctx.events.emit_cleanup("__cleanup");
479 let cleanup_result =
480 self.run_cleanup_block(cleanup_block, &handle.scope).await;
481 if let Err(failure) = cleanup_result {
482 self.rt_ctx.events.emit_warning(
483 "__cleanup",
484 format!("effect {effect_name} cleanup failed"),
485 );
486 warnings.push(Warning::CleanupFailed {
487 source: CleanupSource::Effect { name: effect_name },
488 failure,
489 });
490 }
491 }
492
493 let deps = handle.dependencies.clone();
494 *guard = EffectSlot::Empty;
495 drop(guard);
496
497 for dep in &deps {
499 warnings.extend(self.run_cleanup(dep).await);
500 }
501 }
502 }
503 EffectSlot::Failed(_) => {
504 }
506 EffectSlot::Empty => {
507 }
509 }
510
511 warnings
512 }
513
514 async fn run_cleanup_block(
515 &self,
516 cleanup_block: &IrCleanupBlock,
517 scope: &Scope,
518 ) -> Result<(), Failure> {
519 let shell_state = ShellState::new("__cleanup".to_string(), None);
520 let ctx = ExecutionContext::new(
521 scope.clone(),
522 shell_state,
523 self.rt_ctx.shell.default_timeout.clone(),
524 self.rt_ctx.env.clone(),
525 );
526 let mut cleanup_rt_ctx = self.rt_ctx.clone();
528 cleanup_rt_ctx.cancel = CancellationToken::new();
529 let mut vm = Vm::new("__cleanup".to_string(), ctx, &cleanup_rt_ctx).await?;
530 vm.exec_stmts(cleanup_block.body()).await?;
531 vm.shutdown().await;
532 Ok(())
533 }
534}