use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::Notify;
use crate::observe::structured::SpanId;
use crate::report::result::ExecError;
use crate::vm::Vm;
use crate::vm::context::Scope;
use relux_core::diagnostics::EffectId as DiagEffectId;
use relux_core::pure::Env;
use relux_ir::IrCleanupBlock;
pub type ShellMap = HashMap<String, Arc<TokioMutex<Vm>>>;
pub type VarMap = HashMap<String, String>;
pub struct ExportedEffect {
pub key: EffectInstanceKey,
pub shells: ShellMap,
pub vars: VarMap,
}
pub struct AcquiredEffect {
pub shells: ShellMap,
pub vars: VarMap,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EffectInstanceKey {
pub effect_id: DiagEffectId,
pub evaluated_overlay: String,
}
impl EffectInstanceKey {
pub fn from_expects(
effect_id: DiagEffectId,
expect_names: &[&str],
evaluated_overlay: &Env,
) -> Self {
let identity: String = expect_names
.iter()
.map(|name| {
let val = evaluated_overlay.get(name).unwrap_or("");
format!("{name}\0{val}")
})
.collect::<Vec<_>>()
.join("\0");
Self {
effect_id,
evaluated_overlay: identity,
}
}
pub fn marker(&self) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
let mut hasher = DefaultHasher::new();
std::hash::Hash::hash(self, &mut hasher);
relux_core::diagnostics::format_mnemonic(hasher.finish())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum ShellInstanceKey {
Effect {
effect: EffectInstanceKey,
shell_name: String,
},
Test {
shell_name: String,
},
}
impl ShellInstanceKey {
pub fn marker(&self) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
let mut hasher = DefaultHasher::new();
std::hash::Hash::hash(self, &mut hasher);
relux_core::diagnostics::format_mnemonic(hasher.finish())
}
}
pub struct EffectHandle {
pub scope: Scope,
pub shells: ShellMap,
pub exposed: HashSet<String>,
pub exposed_vars: VarMap,
pub dep_guards: Vec<EffectGuard>,
pub cleanup: Option<IrCleanupBlock>,
pub setup_span: SpanId,
pub key: EffectInstanceKey,
pub marker: String,
pub alias: Option<String>,
}
impl EffectHandle {
pub fn exposed_shells(&self) -> ShellMap {
self.shells
.iter()
.filter(|(name, _)| self.exposed.contains(name.as_str()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
pub fn exposed_vars(&self) -> &VarMap {
&self.exposed_vars
}
}
pub enum EffectSlot {
Empty,
Loading(Arc<Notify>),
Ready {
refcount: usize,
handle: Box<EffectHandle>,
},
Failed(ExecError),
}
pub enum ReleaseOutcome {
LastHolder {
handle: Box<EffectHandle>,
},
Deferred {
effect: String,
alias: Option<String>,
setup_span: SpanId,
marker: String,
},
Drift,
}
pub struct EffectGuard {
slot: Arc<TokioMutex<EffectSlot>>,
}
impl EffectGuard {
pub(crate) fn new(slot: Arc<TokioMutex<EffectSlot>>) -> Self {
Self { slot }
}
pub async fn release(self) -> ReleaseOutcome {
let mut guard = self.slot.lock().await;
match &mut *guard {
EffectSlot::Ready { refcount, handle } => {
*refcount -= 1;
if *refcount == 0 {
let taken = std::mem::replace(&mut *guard, EffectSlot::Empty);
match taken {
EffectSlot::Ready { handle, .. } => ReleaseOutcome::LastHolder { handle },
_ => unreachable!("matched Ready above"),
}
} else {
ReleaseOutcome::Deferred {
effect: handle.scope.name().to_string(),
alias: handle.alias.clone(),
setup_span: handle.setup_span,
marker: handle.marker.clone(),
}
}
}
EffectSlot::Empty | EffectSlot::Loading(_) | EffectSlot::Failed(_) => {
debug_assert!(
false,
r#"EffectGuard::release on non-Ready slot indicates a refcount/outstanding-guard drift; every guard must point at a Ready slot until it is consumed"#,
);
ReleaseOutcome::Drift
}
}
}
}
pub struct EffectRegistry {
slots: std::sync::Mutex<HashMap<EffectInstanceKey, Arc<TokioMutex<EffectSlot>>>>,
}
impl Default for EffectRegistry {
fn default() -> Self {
Self::new()
}
}
impl EffectRegistry {
pub fn new() -> Self {
Self {
slots: std::sync::Mutex::new(HashMap::new()),
}
}
pub fn slot(&self, key: &EffectInstanceKey) -> Arc<TokioMutex<EffectSlot>> {
self.slots
.lock()
.expect("slot map mutex poisoned")
.entry(key.clone())
.or_insert_with(|| Arc::new(TokioMutex::new(EffectSlot::Empty)))
.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_key(name: &str) -> EffectInstanceKey {
EffectInstanceKey {
effect_id: DiagEffectId {
module: relux_core::diagnostics::ModulePath("test.relux".into()),
name: relux_core::diagnostics::EffectName(name.to_string()),
},
evaluated_overlay: String::new(),
}
}
fn test_key_with_overlay(name: &str, overlay: &str) -> EffectInstanceKey {
EffectInstanceKey {
effect_id: DiagEffectId {
module: relux_core::diagnostics::ModulePath("test.relux".into()),
name: relux_core::diagnostics::EffectName(name.to_string()),
},
evaluated_overlay: overlay.to_string(),
}
}
fn stub_handle() -> EffectHandle {
use crate::vm::context::Scope;
use relux_core::pure::VarScope;
EffectHandle {
scope: Scope::Test {
name: "stub".into(),
vars: Arc::new(TokioMutex::new(VarScope::new())),
timeout: None,
},
shells: HashMap::new(),
exposed: HashSet::new(),
exposed_vars: HashMap::new(),
dep_guards: Vec::new(),
cleanup: None,
setup_span: 0u64,
key: test_key("stub"),
marker: "stub-marker-0000".into(),
alias: None,
}
}
fn ready_slot(refcount: usize) -> Arc<TokioMutex<EffectSlot>> {
Arc::new(TokioMutex::new(EffectSlot::Ready {
refcount,
handle: Box::new(stub_handle()),
}))
}
#[test]
fn key_equality_same() {
let k1 = test_key("Db");
let k2 = test_key("Db");
assert_eq!(k1, k2);
}
#[test]
fn key_equality_different_name() {
let k1 = test_key("Db");
let k2 = test_key("Redis");
assert_ne!(k1, k2);
}
#[test]
fn key_equality_different_overlay() {
let k1 = test_key_with_overlay("Db", "PORT=5432");
let k2 = test_key_with_overlay("Db", "PORT=5433");
assert_ne!(k1, k2);
}
#[test]
fn key_hash_consistent() {
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
let k1 = test_key("Db");
let k2 = test_key("Db");
let mut h1 = DefaultHasher::new();
let mut h2 = DefaultHasher::new();
k1.hash(&mut h1);
k2.hash(&mut h2);
assert_eq!(h1.finish(), h2.finish());
}
#[test]
fn registry_new_is_empty() {
let reg = EffectRegistry::new();
assert!(reg.slots.lock().unwrap().is_empty());
}
#[tokio::test]
async fn slot_creates_empty_on_first_access() {
let reg = EffectRegistry::new();
let key = test_key("Db");
let slot = reg.slot(&key);
let guard = slot.lock().await;
assert!(matches!(*guard, EffectSlot::Empty));
}
#[tokio::test]
async fn slot_returns_same_arc_for_same_key() {
let reg = EffectRegistry::new();
let key = test_key("Db");
let s1 = reg.slot(&key);
let s2 = reg.slot(&key);
assert!(Arc::ptr_eq(&s1, &s2));
}
#[tokio::test]
async fn slot_returns_different_arcs_for_different_keys() {
let reg = EffectRegistry::new();
let k1 = test_key("Db");
let k2 = test_key("Redis");
let s1 = reg.slot(&k1);
let s2 = reg.slot(&k2);
assert!(!Arc::ptr_eq(&s1, &s2));
}
#[tokio::test]
async fn release_deferred_carries_metadata_and_decrements() {
let slot = ready_slot(2);
let g = EffectGuard::new(slot.clone());
match g.release().await {
ReleaseOutcome::Deferred { marker, .. } => {
assert_eq!(marker, "stub-marker-0000");
}
_ => panic!("non-last release should produce Deferred"),
}
let guard = slot.lock().await;
match &*guard {
EffectSlot::Ready { refcount, .. } => assert_eq!(*refcount, 1),
_ => panic!("slot should remain Ready with refcount 1"),
}
}
#[tokio::test]
async fn release_last_holder_returns_handle_and_empties_slot() {
let slot = ready_slot(1);
let g = EffectGuard::new(slot.clone());
match g.release().await {
ReleaseOutcome::LastHolder { handle } => {
assert_eq!(handle.marker, "stub-marker-0000");
}
_ => panic!("last release should produce LastHolder"),
}
let guard = slot.lock().await;
assert!(matches!(*guard, EffectSlot::Empty), "slot should be Empty");
}
#[tokio::test]
async fn concurrent_releases_serialize_via_slot_mutex() {
let slot = ready_slot(2);
let g1 = EffectGuard::new(slot.clone());
let g2 = EffectGuard::new(slot.clone());
let (a, b) = tokio::join!(g1.release(), g2.release());
let mut last_holder = 0usize;
let mut deferred = 0usize;
for outcome in [a, b] {
match outcome {
ReleaseOutcome::LastHolder { .. } => last_holder += 1,
ReleaseOutcome::Deferred { .. } => deferred += 1,
ReleaseOutcome::Drift => panic!("unexpected Drift"),
}
}
assert_eq!(last_holder, 1, "exactly one releaser is the last holder");
assert_eq!(deferred, 1, "exactly one releaser is deferred");
let guard = slot.lock().await;
assert!(matches!(*guard, EffectSlot::Empty));
}
#[test]
fn from_expects_no_collision_when_value_contains_separator() {
use std::collections::HashMap;
let effect_id = DiagEffectId {
module: relux_core::diagnostics::ModulePath("test.relux".into()),
name: relux_core::diagnostics::EffectName("E".to_string()),
};
let mut overlay1 = HashMap::new();
overlay1.insert("A".into(), "x,B=y".into());
let env1 = relux_core::pure::Env::from_map(overlay1);
let mut overlay2 = HashMap::new();
overlay2.insert("A".into(), "x".into());
overlay2.insert("B".into(), "y".into());
let env2 = relux_core::pure::Env::from_map(overlay2);
let expects = &["A"];
let k1 = EffectInstanceKey::from_expects(effect_id.clone(), expects, &env1);
let k2 = EffectInstanceKey::from_expects(effect_id, expects, &env2);
assert_ne!(
k1, k2,
"different expect values must produce different keys"
);
}
#[test]
fn from_expects_uses_only_expected_keys() {
use std::collections::HashMap;
let effect_id = DiagEffectId {
module: relux_core::diagnostics::ModulePath("test.relux".into()),
name: relux_core::diagnostics::EffectName("E".to_string()),
};
let mut overlay1 = HashMap::new();
overlay1.insert("PORT".into(), "5432".into());
overlay1.insert("EXTRA".into(), "foo".into());
let env1 = relux_core::pure::Env::from_map(overlay1);
let mut overlay2 = HashMap::new();
overlay2.insert("PORT".into(), "5432".into());
overlay2.insert("EXTRA".into(), "bar".into());
let env2 = relux_core::pure::Env::from_map(overlay2);
let expects = &["PORT"];
let k1 = EffectInstanceKey::from_expects(effect_id.clone(), expects, &env1);
let k2 = EffectInstanceKey::from_expects(effect_id, expects, &env2);
assert_eq!(
k1, k2,
"extra overlay keys beyond expects should not affect identity"
);
}
#[test]
fn from_expects_declaration_order_is_stable() {
use std::collections::HashMap;
let effect_id = DiagEffectId {
module: relux_core::diagnostics::ModulePath("test.relux".into()),
name: relux_core::diagnostics::EffectName("E".to_string()),
};
let mut overlay = HashMap::new();
overlay.insert("A".into(), "1".into());
overlay.insert("B".into(), "2".into());
let env = relux_core::pure::Env::from_map(overlay);
let k1 = EffectInstanceKey::from_expects(effect_id.clone(), &["A", "B"], &env);
let k2 = EffectInstanceKey::from_expects(effect_id, &["A", "B"], &env);
assert_eq!(k1, k2);
}
#[test]
fn from_expects_empty_expects_produces_equal_keys() {
use std::collections::HashMap;
let effect_id = DiagEffectId {
module: relux_core::diagnostics::ModulePath("test.relux".into()),
name: relux_core::diagnostics::EffectName("E".to_string()),
};
let mut overlay1 = HashMap::new();
overlay1.insert("X".into(), "1".into());
let env1 = relux_core::pure::Env::from_map(overlay1);
let env2 = relux_core::pure::Env::from_map(HashMap::new());
let expects: &[&str] = &[];
let k1 = EffectInstanceKey::from_expects(effect_id.clone(), expects, &env1);
let k2 = EffectInstanceKey::from_expects(effect_id, expects, &env2);
assert_eq!(
k1, k2,
"effects with no expects should always share identity"
);
}
#[test]
fn marker_is_stable_for_same_key() {
let k = test_key("FX");
assert_eq!(k.marker(), k.marker());
}
#[test]
fn marker_differs_for_different_overlay() {
let a = test_key_with_overlay("FX", "alpha");
let b = test_key_with_overlay("FX", "beta");
assert_ne!(a.marker(), b.marker());
}
#[test]
fn marker_matches_mnemonic_format() {
let k = test_key("FX");
let m = k.marker();
let parts: Vec<&str> = m.split('-').collect();
assert_eq!(parts.len(), 3, "marker {m:?} should be adj-noun-NNNN");
assert!(parts[0].chars().all(|c| c.is_ascii_lowercase()));
assert!(parts[1].chars().all(|c| c.is_ascii_lowercase()));
assert_eq!(parts[2].len(), 4);
assert!(parts[2].chars().all(|c| c.is_ascii_digit()));
}
#[test]
fn shell_key_effect_marker_is_stable() {
let key = ShellInstanceKey::Effect {
effect: test_key("Db"),
shell_name: "redis".into(),
};
assert_eq!(key.marker(), key.marker());
}
#[test]
fn shell_key_test_marker_is_stable() {
let key = ShellInstanceKey::Test {
shell_name: "default".into(),
};
assert_eq!(key.marker(), key.marker());
}
#[test]
fn shell_key_marker_matches_mnemonic_format() {
let key = ShellInstanceKey::Test {
shell_name: "default".into(),
};
let m = key.marker();
let parts: Vec<&str> = m.split('-').collect();
assert_eq!(parts.len(), 3, "marker {m:?} should be adj-noun-NNNN");
assert_eq!(parts[2].len(), 4);
assert!(parts[2].chars().all(|c| c.is_ascii_digit()));
}
#[test]
fn shell_key_effect_vs_test_dont_collide_for_same_name() {
let effect_key = ShellInstanceKey::Effect {
effect: test_key("Db"),
shell_name: "default".into(),
};
let test_key_shell = ShellInstanceKey::Test {
shell_name: "default".into(),
};
assert_ne!(effect_key.marker(), test_key_shell.marker());
}
#[test]
fn shell_key_different_shell_names_produce_distinct_markers() {
let a = ShellInstanceKey::Effect {
effect: test_key("Db"),
shell_name: "redis".into(),
};
let b = ShellInstanceKey::Effect {
effect: test_key("Db"),
shell_name: "postgres".into(),
};
assert_ne!(a.marker(), b.marker());
}
#[test]
fn shell_key_cleanup_distinct_per_effect_instance() {
let a = ShellInstanceKey::Effect {
effect: test_key("Db"),
shell_name: "__cleanup".into(),
};
let b = ShellInstanceKey::Effect {
effect: test_key("Redis"),
shell_name: "__cleanup".into(),
};
assert_ne!(a.marker(), b.marker());
}
}