use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use uuid::Uuid;
pub const SESSION_REF_TAG: &str = "__blazen_session_ref__";
pub const MAX_SESSION_REFS_PER_RUN: usize = 10_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RegistryKey(pub Uuid);
impl RegistryKey {
#[must_use]
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub fn parse(s: &str) -> Result<Self, uuid::Error> {
Uuid::parse_str(s).map(Self)
}
}
impl Default for RegistryKey {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for RegistryKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
type AnyArc = Arc<dyn Any + Send + Sync>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteRefDescriptor {
pub origin_node_id: String,
pub type_tag: String,
pub created_at_epoch_ms: u64,
}
type SerializableArc = Arc<dyn SessionRefSerializable>;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RefLifetime {
#[default]
UntilContextDrop,
UntilExplicitDrop,
UntilSnapshot,
UntilParentFinish,
}
#[derive(Default)]
pub struct SessionRefRegistry {
inner: RwLock<HashMap<RegistryKey, AnyArc>>,
serializable: RwLock<HashMap<RegistryKey, SerializableArc>>,
lifetimes: RwLock<HashMap<RegistryKey, RefLifetime>>,
remote_refs: RwLock<HashMap<RegistryKey, RemoteRefDescriptor>>,
}
impl std::fmt::Debug for SessionRefRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionRefRegistry").finish_non_exhaustive()
}
}
impl SessionRefRegistry {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn insert_arc(&self, value: AnyArc) -> Result<RegistryKey, SessionRefError> {
self.insert_arc_with_lifetime(value, RefLifetime::default())
.await
}
pub async fn insert_arc_with_lifetime(
&self,
value: AnyArc,
lifetime: RefLifetime,
) -> Result<RegistryKey, SessionRefError> {
let mut g = self.inner.write().await;
if g.len() >= MAX_SESSION_REFS_PER_RUN {
return Err(SessionRefError::CapacityExceeded {
cap: MAX_SESSION_REFS_PER_RUN,
});
}
let key = RegistryKey::new();
g.insert(key, value);
drop(g);
self.lifetimes.write().await.insert(key, lifetime);
Ok(key)
}
pub async fn insert<T: Any + Send + Sync + 'static>(
&self,
value: T,
) -> Result<RegistryKey, SessionRefError> {
self.insert_arc(Arc::new(value)).await
}
pub async fn insert_with_lifetime<T: Any + Send + Sync + 'static>(
&self,
value: T,
lifetime: RefLifetime,
) -> Result<RegistryKey, SessionRefError> {
self.insert_arc_with_lifetime(Arc::new(value), lifetime)
.await
}
pub async fn insert_serializable<T>(
&self,
value: Arc<T>,
) -> Result<RegistryKey, SessionRefError>
where
T: SessionRefSerializable + 'static,
{
let mut main = self.inner.write().await;
if main.len() >= MAX_SESSION_REFS_PER_RUN {
return Err(SessionRefError::CapacityExceeded {
cap: MAX_SESSION_REFS_PER_RUN,
});
}
let key = RegistryKey::new();
main.insert(key, value.clone() as AnyArc);
drop(main);
let mut ser = self.serializable.write().await;
ser.insert(key, value as SerializableArc);
drop(ser);
self.lifetimes
.write()
.await
.insert(key, RefLifetime::default());
Ok(key)
}
pub async fn insert_serializable_with_key(
&self,
key: RegistryKey,
value: Arc<dyn SessionRefSerializable>,
) -> Result<(), SessionRefError> {
let mut main = self.inner.write().await;
if main.len() >= MAX_SESSION_REFS_PER_RUN {
return Err(SessionRefError::CapacityExceeded {
cap: MAX_SESSION_REFS_PER_RUN,
});
}
let any_arc: AnyArc = Arc::new(SerializableHolder(value.clone()));
main.insert(key, any_arc);
drop(main);
let mut ser = self.serializable.write().await;
ser.insert(key, value);
drop(ser);
self.lifetimes
.write()
.await
.insert(key, RefLifetime::default());
Ok(())
}
pub async fn get_serializable(
&self,
key: RegistryKey,
) -> Option<Arc<dyn SessionRefSerializable>> {
self.serializable.read().await.get(&key).cloned()
}
pub async fn serializable_entries(
&self,
) -> Vec<(RegistryKey, Arc<dyn SessionRefSerializable>)> {
self.serializable
.read()
.await
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect()
}
pub async fn insert_remote(
&self,
key: RegistryKey,
descriptor: RemoteRefDescriptor,
) -> Result<(), SessionRefError> {
let mut remote = self.remote_refs.write().await;
if remote.len() >= MAX_SESSION_REFS_PER_RUN {
return Err(SessionRefError::CapacityExceeded {
cap: MAX_SESSION_REFS_PER_RUN,
});
}
remote.insert(key, descriptor);
Ok(())
}
pub async fn get_remote(&self, key: RegistryKey) -> Option<RemoteRefDescriptor> {
self.remote_refs.read().await.get(&key).cloned()
}
pub async fn is_remote(&self, key: RegistryKey) -> bool {
self.remote_refs.read().await.contains_key(&key)
}
pub async fn remote_entries(&self) -> Vec<(RegistryKey, RemoteRefDescriptor)> {
self.remote_refs
.read()
.await
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect()
}
pub async fn get_any(&self, key: RegistryKey) -> Option<AnyArc> {
self.inner.read().await.get(&key).cloned()
}
pub async fn get<T: Any + Send + Sync + 'static>(&self, key: RegistryKey) -> Option<Arc<T>> {
let any = self.inner.read().await.get(&key).cloned()?;
Arc::downcast::<T>(any).ok()
}
pub async fn remove(&self, key: RegistryKey) -> Option<AnyArc> {
let removed = self.inner.write().await.remove(&key);
self.serializable.write().await.remove(&key);
self.lifetimes.write().await.remove(&key);
self.remote_refs.write().await.remove(&key);
removed
}
pub async fn drain(&self) -> usize {
let mut g = self.inner.write().await;
let n = g.len();
g.clear();
drop(g);
self.serializable.write().await.clear();
self.lifetimes.write().await.clear();
self.remote_refs.write().await.clear();
n
}
pub async fn lifetime_of(&self, key: RegistryKey) -> Option<RefLifetime> {
self.lifetimes.read().await.get(&key).copied()
}
pub async fn clear_on_context_drop(&self, owns_registry: bool) -> usize {
let to_remove: Vec<RegistryKey> = {
let g = self.lifetimes.read().await;
g.iter()
.filter_map(|(k, lt)| match lt {
RefLifetime::UntilContextDrop | RefLifetime::UntilParentFinish
if owns_registry =>
{
Some(*k)
}
RefLifetime::UntilContextDrop
| RefLifetime::UntilExplicitDrop
| RefLifetime::UntilSnapshot
| RefLifetime::UntilParentFinish => None,
})
.collect()
};
let mut count = 0;
for key in to_remove {
if self.remove(key).await.is_some() {
count += 1;
}
}
count
}
pub async fn len(&self) -> usize {
self.inner.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.inner.read().await.is_empty()
}
pub async fn keys(&self) -> Vec<RegistryKey> {
self.inner.read().await.keys().copied().collect()
}
pub async fn clone_ref(&self, src_key: RegistryKey) -> Result<RegistryKey, SessionRefError> {
let src_any = {
let main = self.inner.read().await;
main.get(&src_key)
.cloned()
.ok_or(SessionRefError::KeyNotFound { key: src_key })?
};
let src_serializable = self.serializable.read().await.get(&src_key).cloned();
let src_lifetime = self
.lifetimes
.read()
.await
.get(&src_key)
.copied()
.unwrap_or_default();
let mut main = self.inner.write().await;
if main.len() >= MAX_SESSION_REFS_PER_RUN {
return Err(SessionRefError::CapacityExceeded {
cap: MAX_SESSION_REFS_PER_RUN,
});
}
let new_key = RegistryKey::new();
main.insert(new_key, src_any);
drop(main);
if let Some(ser) = src_serializable {
self.serializable.write().await.insert(new_key, ser);
}
self.lifetimes.write().await.insert(new_key, src_lifetime);
Ok(new_key)
}
pub async fn transfer_ref(
&self,
src_key: RegistryKey,
dst_registry: &SessionRefRegistry,
) -> Result<RegistryKey, SessionRefError> {
if std::ptr::eq(std::ptr::from_ref(self), std::ptr::from_ref(dst_registry)) {
let main = self.inner.read().await;
if main.contains_key(&src_key) {
return Ok(src_key);
}
return Err(SessionRefError::KeyNotFound { key: src_key });
}
let src_any = {
let main = self.inner.read().await;
main.get(&src_key)
.cloned()
.ok_or(SessionRefError::KeyNotFound { key: src_key })?
};
let src_serializable = self.serializable.read().await.get(&src_key).cloned();
let src_lifetime = self
.lifetimes
.read()
.await
.get(&src_key)
.copied()
.unwrap_or_default();
{
let mut dst_main = dst_registry.inner.write().await;
if dst_main.len() >= MAX_SESSION_REFS_PER_RUN {
return Err(SessionRefError::CapacityExceeded {
cap: MAX_SESSION_REFS_PER_RUN,
});
}
dst_main.insert(src_key, src_any);
}
if let Some(ser) = src_serializable {
dst_registry.serializable.write().await.insert(src_key, ser);
}
dst_registry
.lifetimes
.write()
.await
.insert(src_key, src_lifetime);
self.inner.write().await.remove(&src_key);
self.serializable.write().await.remove(&src_key);
self.lifetimes.write().await.remove(&src_key);
Ok(src_key)
}
}
#[cfg(not(target_arch = "wasm32"))]
tokio::task_local! {
pub static CURRENT_SESSION_REGISTRY: Arc<SessionRefRegistry>;
}
#[cfg(target_arch = "wasm32")]
mod wasm_task_local {
use std::cell::RefCell;
use std::future::Future;
use std::sync::Arc;
use super::SessionRefRegistry;
thread_local! {
static SLOT: RefCell<Option<Arc<SessionRefRegistry>>> = const { RefCell::new(None) };
}
#[derive(Debug)]
pub struct AccessError;
impl std::fmt::Display for AccessError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("task-local value not set")
}
}
impl std::error::Error for AccessError {}
pub struct LocalKey;
impl LocalKey {
pub async fn scope<F>(&'static self, value: Arc<SessionRefRegistry>, fut: F) -> F::Output
where
F: Future,
{
struct Guard(Option<Arc<SessionRefRegistry>>);
impl Drop for Guard {
fn drop(&mut self) {
SLOT.with(|s| *s.borrow_mut() = self.0.take());
}
}
let prev = SLOT.with(|s| s.borrow_mut().replace(value));
let _guard = Guard(prev);
fut.await
}
pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError>
where
F: FnOnce(&Arc<SessionRefRegistry>) -> R,
{
SLOT.with(|s| {
let borrow = s.borrow();
match borrow.as_ref() {
Some(v) => Ok(f(v)),
None => Err(AccessError),
}
})
}
}
pub static CURRENT_SESSION_REGISTRY: LocalKey = LocalKey;
}
#[cfg(target_arch = "wasm32")]
pub use wasm_task_local::CURRENT_SESSION_REGISTRY;
pub async fn with_session_registry<F, T>(registry: Arc<SessionRefRegistry>, fut: F) -> T
where
F: Future<Output = T>,
{
CURRENT_SESSION_REGISTRY.scope(registry, fut).await
}
#[must_use]
pub fn current_session_registry() -> Option<Arc<SessionRefRegistry>> {
CURRENT_SESSION_REGISTRY.try_with(Arc::clone).ok()
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionPausePolicy {
#[default]
PickleOrError,
PickleOrSerialize,
WarnDrop,
HardError,
}
pub const SERIALIZED_SESSION_REFS_META_KEY: &str = "__blazen_serialized_session_refs";
pub trait SessionRefSerializable: Any + Send + Sync {
fn blazen_serialize(&self) -> Result<Vec<u8>, SessionRefError>;
fn blazen_type_tag(&self) -> &'static str;
}
struct SerializableHolder(#[allow(dead_code)] Arc<dyn SessionRefSerializable>);
#[derive(Debug, thiserror::Error)]
pub enum SessionRefError {
#[error(
"session ref registry capacity exceeded ({cap} entries) — \
too many live references in this workflow run"
)]
CapacityExceeded {
cap: usize,
},
#[error("session ref serialization failed for type_tag {type_tag}: {source}")]
SerializationFailed {
type_tag: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("session ref key {key} not found in source registry")]
KeyNotFound {
key: RegistryKey,
},
#[error(
"session ref `{key}` is a remote ref on node `{origin_node_id}` — \
call peer.deref_session_ref() to resolve"
)]
RemoteRefNotResolved {
key: RegistryKey,
origin_node_id: String,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn insert_and_get_roundtrip() {
let reg = SessionRefRegistry::new();
let key = reg.insert(42_i32).await.unwrap();
let got = reg.get::<i32>(key).await.unwrap();
assert_eq!(*got, 42);
}
#[tokio::test]
async fn get_wrong_type_returns_none() {
let reg = SessionRefRegistry::new();
let key = reg.insert(42_i32).await.unwrap();
assert!(reg.get::<String>(key).await.is_none());
}
#[tokio::test]
async fn remove_returns_value_and_clears() {
let reg = SessionRefRegistry::new();
let key = reg.insert("hello".to_owned()).await.unwrap();
assert_eq!(reg.len().await, 1);
let removed = reg.remove(key).await;
assert!(removed.is_some());
assert_eq!(reg.len().await, 0);
}
#[tokio::test]
async fn drain_clears_everything() {
let reg = SessionRefRegistry::new();
let _ = reg.insert(1_i32).await.unwrap();
let _ = reg.insert(2_i32).await.unwrap();
let _ = reg.insert(3_i32).await.unwrap();
assert_eq!(reg.drain().await, 3);
assert!(reg.is_empty().await);
}
#[tokio::test]
async fn capacity_cap_enforced() {
let reg = SessionRefRegistry::new();
for i in 0..100_i32 {
assert!(reg.insert(i).await.is_ok());
}
assert_eq!(reg.len().await, 100);
}
#[test]
fn registry_key_parse_roundtrip() {
let k = RegistryKey::new();
let s = k.to_string();
let parsed = RegistryKey::parse(&s).unwrap();
assert_eq!(k, parsed);
}
#[test]
fn session_pause_policy_default_is_pickle_or_error() {
assert_eq!(
SessionPausePolicy::default(),
SessionPausePolicy::PickleOrError
);
}
#[test]
fn session_pause_policy_serde_roundtrip() {
let p = SessionPausePolicy::WarnDrop;
let json = serde_json::to_string(&p).unwrap();
assert_eq!(json, "\"warn_drop\"");
let back: SessionPausePolicy = serde_json::from_str(&json).unwrap();
assert_eq!(back, p);
}
#[tokio::test]
async fn current_returns_none_outside_scope() {
assert!(current_session_registry().is_none());
}
#[tokio::test]
async fn current_returns_registry_inside_tokio_scope() {
let reg = Arc::new(SessionRefRegistry::new());
let reg_clone = Arc::clone(®);
with_session_registry(reg, async move {
let got = current_session_registry().expect("registry should be set");
assert!(Arc::ptr_eq(&got, ®_clone));
})
.await;
}
#[tokio::test]
async fn scope_isolates_concurrent_tasks() {
let reg_a = Arc::new(SessionRefRegistry::new());
let reg_b = Arc::new(SessionRefRegistry::new());
let _ = reg_a.insert(1_i32).await.unwrap();
let _ = reg_b.insert(2_i32).await.unwrap();
let a_clone = Arc::clone(®_a);
let b_clone = Arc::clone(®_b);
let task_a = tokio::spawn(async move {
with_session_registry(a_clone, async move {
let got = current_session_registry().unwrap();
got.len().await
})
.await
});
let task_b = tokio::spawn(async move {
with_session_registry(b_clone, async move {
let got = current_session_registry().unwrap();
got.len().await
})
.await
});
assert_eq!(task_a.await.unwrap(), 1);
assert_eq!(task_b.await.unwrap(), 1);
}
struct TestSerializable {
value: i32,
}
impl SessionRefSerializable for TestSerializable {
fn blazen_serialize(&self) -> Result<Vec<u8>, SessionRefError> {
Ok(self.value.to_be_bytes().to_vec())
}
fn blazen_type_tag(&self) -> &'static str {
"test::TestSerializable"
}
}
#[tokio::test]
async fn insert_serializable_populates_both_maps() {
let reg = SessionRefRegistry::new();
let key = reg
.insert_serializable(Arc::new(TestSerializable { value: 7 }))
.await
.unwrap();
let concrete = reg.get::<TestSerializable>(key).await.unwrap();
assert_eq!(concrete.value, 7);
let ser = reg.get_serializable(key).await.unwrap();
assert_eq!(ser.blazen_type_tag(), "test::TestSerializable");
assert_eq!(ser.blazen_serialize().unwrap(), vec![0, 0, 0, 7]);
}
#[tokio::test]
async fn serializable_entries_returns_all_live_keys() {
let reg = SessionRefRegistry::new();
let k1 = reg
.insert_serializable(Arc::new(TestSerializable { value: 1 }))
.await
.unwrap();
let k2 = reg
.insert_serializable(Arc::new(TestSerializable { value: 2 }))
.await
.unwrap();
let _ = reg.insert(99_i64).await.unwrap();
let entries = reg.serializable_entries().await;
assert_eq!(entries.len(), 2);
let got: std::collections::HashSet<RegistryKey> = entries.iter().map(|(k, _)| *k).collect();
let expected: std::collections::HashSet<RegistryKey> = [k1, k2].into_iter().collect();
assert_eq!(got, expected);
}
#[tokio::test]
async fn remove_clears_sidecar_entry() {
let reg = SessionRefRegistry::new();
let key = reg
.insert_serializable(Arc::new(TestSerializable { value: 5 }))
.await
.unwrap();
assert!(reg.get_serializable(key).await.is_some());
let _ = reg.remove(key).await;
assert!(reg.get_serializable(key).await.is_none());
assert!(reg.get::<TestSerializable>(key).await.is_none());
}
#[tokio::test]
async fn drain_clears_sidecar() {
let reg = SessionRefRegistry::new();
let _ = reg
.insert_serializable(Arc::new(TestSerializable { value: 1 }))
.await
.unwrap();
let _ = reg
.insert_serializable(Arc::new(TestSerializable { value: 2 }))
.await
.unwrap();
assert_eq!(reg.serializable_entries().await.len(), 2);
let drained = reg.drain().await;
assert_eq!(drained, 2);
assert_eq!(reg.serializable_entries().await.len(), 0);
}
#[tokio::test]
async fn insert_serializable_with_key_reuses_registry_key() {
let reg = SessionRefRegistry::new();
let key = RegistryKey::new();
let value: Arc<dyn SessionRefSerializable> = Arc::new(TestSerializable { value: 11 });
reg.insert_serializable_with_key(key, value).await.unwrap();
let ser = reg.get_serializable(key).await.unwrap();
assert_eq!(ser.blazen_serialize().unwrap(), vec![0, 0, 0, 11]);
assert!(reg.get_any(key).await.is_some());
}
#[test]
fn session_pause_policy_pickle_or_serialize_serde() {
let p = SessionPausePolicy::PickleOrSerialize;
let json = serde_json::to_string(&p).unwrap();
assert_eq!(json, "\"pickle_or_serialize\"");
let back: SessionPausePolicy = serde_json::from_str(&json).unwrap();
assert_eq!(back, p);
}
#[test]
fn ref_lifetime_default_is_until_context_drop() {
assert_eq!(RefLifetime::default(), RefLifetime::UntilContextDrop);
}
#[test]
fn ref_lifetime_serde_roundtrip_all_variants() {
for (variant, expected_json) in [
(RefLifetime::UntilContextDrop, "\"until_context_drop\""),
(RefLifetime::UntilExplicitDrop, "\"until_explicit_drop\""),
(RefLifetime::UntilSnapshot, "\"until_snapshot\""),
(RefLifetime::UntilParentFinish, "\"until_parent_finish\""),
] {
let json = serde_json::to_string(&variant).unwrap();
assert_eq!(json, expected_json);
let back: RefLifetime = serde_json::from_str(&json).unwrap();
assert_eq!(back, variant);
}
}
#[tokio::test]
async fn insert_default_lifetime_is_until_context_drop() {
let reg = SessionRefRegistry::new();
let key = reg.insert(42_i32).await.unwrap();
assert_eq!(
reg.lifetime_of(key).await,
Some(RefLifetime::UntilContextDrop)
);
}
#[tokio::test]
async fn insert_with_lifetime_records_explicit_policy() {
let reg = SessionRefRegistry::new();
let k1 = reg
.insert_with_lifetime(1_i32, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
let k2 = reg
.insert_with_lifetime("hi".to_owned(), RefLifetime::UntilSnapshot)
.await
.unwrap();
let k3 = reg
.insert_with_lifetime(2_u64, RefLifetime::UntilParentFinish)
.await
.unwrap();
assert_eq!(
reg.lifetime_of(k1).await,
Some(RefLifetime::UntilExplicitDrop)
);
assert_eq!(reg.lifetime_of(k2).await, Some(RefLifetime::UntilSnapshot));
assert_eq!(
reg.lifetime_of(k3).await,
Some(RefLifetime::UntilParentFinish)
);
}
#[tokio::test]
async fn insert_arc_with_lifetime_records_explicit_policy() {
let reg = SessionRefRegistry::new();
let val: Arc<dyn Any + Send + Sync> = Arc::new(7_i32);
let key = reg
.insert_arc_with_lifetime(val, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
assert_eq!(
reg.lifetime_of(key).await,
Some(RefLifetime::UntilExplicitDrop)
);
}
#[tokio::test]
async fn insert_serializable_records_default_lifetime() {
let reg = SessionRefRegistry::new();
let key = reg
.insert_serializable(Arc::new(TestSerializable { value: 9 }))
.await
.unwrap();
assert_eq!(
reg.lifetime_of(key).await,
Some(RefLifetime::UntilContextDrop)
);
}
#[tokio::test]
async fn insert_serializable_with_key_records_default_lifetime() {
let reg = SessionRefRegistry::new();
let key = RegistryKey::new();
let value: Arc<dyn SessionRefSerializable> = Arc::new(TestSerializable { value: 1 });
reg.insert_serializable_with_key(key, value).await.unwrap();
assert_eq!(
reg.lifetime_of(key).await,
Some(RefLifetime::UntilContextDrop)
);
}
#[tokio::test]
async fn insert_until_context_drop_is_cleared() {
let reg = SessionRefRegistry::new();
let key = reg.insert(42_i32).await.unwrap();
let removed = reg.clear_on_context_drop(true).await;
assert_eq!(removed, 1);
assert!(reg.get::<i32>(key).await.is_none());
assert_eq!(reg.lifetime_of(key).await, None);
assert!(reg.is_empty().await);
}
#[tokio::test]
async fn insert_until_explicit_drop_survives_clear() {
let reg = SessionRefRegistry::new();
let key = reg
.insert_with_lifetime(99_i32, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
let removed = reg.clear_on_context_drop(true).await;
assert_eq!(removed, 0);
assert_eq!(reg.len().await, 1);
assert_eq!(*reg.get::<i32>(key).await.unwrap(), 99);
assert_eq!(
reg.lifetime_of(key).await,
Some(RefLifetime::UntilExplicitDrop)
);
assert!(reg.remove(key).await.is_some());
assert!(reg.get::<i32>(key).await.is_none());
assert_eq!(reg.lifetime_of(key).await, None);
}
#[tokio::test]
async fn insert_until_snapshot_survives_context_drop_but_purged_by_snapshot_walker() {
let reg = SessionRefRegistry::new();
let key = reg
.insert_with_lifetime("ephemeral".to_owned(), RefLifetime::UntilSnapshot)
.await
.unwrap();
let removed = reg.clear_on_context_drop(true).await;
assert_eq!(removed, 0);
assert_eq!(reg.len().await, 1);
assert_eq!(reg.lifetime_of(key).await, Some(RefLifetime::UntilSnapshot));
}
#[tokio::test]
async fn until_parent_finish_only_purged_when_owns_registry() {
let reg = SessionRefRegistry::new();
let key = reg
.insert_with_lifetime(123_i32, RefLifetime::UntilParentFinish)
.await
.unwrap();
let removed = reg.clear_on_context_drop(false).await;
assert_eq!(removed, 0);
assert_eq!(reg.len().await, 1);
assert_eq!(
reg.lifetime_of(key).await,
Some(RefLifetime::UntilParentFinish)
);
let removed = reg.clear_on_context_drop(true).await;
assert_eq!(removed, 1);
assert_eq!(reg.len().await, 0);
assert_eq!(reg.lifetime_of(key).await, None);
}
#[tokio::test]
async fn remove_clears_lifetime_sidecar() {
let reg = SessionRefRegistry::new();
let key = reg.insert(5_i32).await.unwrap();
assert!(reg.lifetime_of(key).await.is_some());
let _ = reg.remove(key).await;
assert!(reg.lifetime_of(key).await.is_none());
}
#[tokio::test]
async fn drain_clears_all_lifetimes() {
let reg = SessionRefRegistry::new();
let k1 = reg.insert(1_i32).await.unwrap();
let k2 = reg
.insert_with_lifetime(2_i32, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
let k3 = reg
.insert_with_lifetime(3_i32, RefLifetime::UntilSnapshot)
.await
.unwrap();
assert_eq!(reg.drain().await, 3);
assert!(reg.lifetime_of(k1).await.is_none());
assert!(reg.lifetime_of(k2).await.is_none());
assert!(reg.lifetime_of(k3).await.is_none());
}
#[tokio::test]
async fn clear_on_context_drop_purges_mixed_population_correctly() {
let reg = SessionRefRegistry::new();
let k_default = reg.insert(0_i32).await.unwrap();
let k_explicit = reg
.insert_with_lifetime(1_i32, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
let k_snapshot = reg
.insert_with_lifetime(2_i32, RefLifetime::UntilSnapshot)
.await
.unwrap();
let k_parent = reg
.insert_with_lifetime(3_i32, RefLifetime::UntilParentFinish)
.await
.unwrap();
let removed = reg.clear_on_context_drop(true).await;
assert_eq!(removed, 2);
assert!(reg.get::<i32>(k_default).await.is_none());
assert!(reg.get::<i32>(k_parent).await.is_none());
assert!(reg.get::<i32>(k_explicit).await.is_some());
assert!(reg.get::<i32>(k_snapshot).await.is_some());
}
#[tokio::test]
async fn clone_ref_creates_second_handle_to_same_value() {
let reg = SessionRefRegistry::new();
let src = reg.insert(123_i32).await.unwrap();
let dup = reg.clone_ref(src).await.unwrap();
assert_ne!(src, dup, "clone_ref must mint a fresh RegistryKey");
let a = reg.get::<i32>(src).await.unwrap();
let b = reg.get::<i32>(dup).await.unwrap();
assert_eq!(*a, 123);
assert_eq!(*b, 123);
assert!(
Arc::ptr_eq(&a, &b),
"both keys must resolve to the SAME Arc, not a deep copy"
);
assert!(reg.get::<i32>(src).await.is_some());
assert_eq!(reg.len().await, 2);
}
#[tokio::test]
async fn clone_ref_inherits_lifetime() {
let reg = SessionRefRegistry::new();
let src = reg
.insert_with_lifetime(7_i32, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
let dup = reg.clone_ref(src).await.unwrap();
assert_eq!(
reg.lifetime_of(dup).await,
Some(RefLifetime::UntilExplicitDrop),
"cloned ref must inherit the source's lifetime policy"
);
assert_eq!(
reg.lifetime_of(src).await,
Some(RefLifetime::UntilExplicitDrop),
"source lifetime must be unchanged"
);
}
#[tokio::test]
async fn clone_ref_inherits_serializable_sidecar() {
let reg = SessionRefRegistry::new();
let src = reg
.insert_serializable(Arc::new(TestSerializable { value: 42 }))
.await
.unwrap();
let dup = reg.clone_ref(src).await.unwrap();
let ser = reg
.get_serializable(dup)
.await
.expect("cloned ref must carry a serializable sidecar");
assert_eq!(ser.blazen_type_tag(), "test::TestSerializable");
assert_eq!(ser.blazen_serialize().unwrap(), vec![0, 0, 0, 42]);
assert!(reg.get_serializable(src).await.is_some());
}
#[tokio::test]
async fn clone_ref_errors_on_missing_source() {
let reg = SessionRefRegistry::new();
let bogus = RegistryKey::new();
let err = reg.clone_ref(bogus).await.unwrap_err();
match err {
SessionRefError::KeyNotFound { key } => assert_eq!(key, bogus),
other => panic!("expected KeyNotFound, got {other:?}"),
}
}
#[tokio::test]
async fn clone_ref_errors_on_full_registry() {
let reg = SessionRefRegistry::new();
let src = reg.insert(1_i32).await.unwrap();
{
let mut main = reg.inner.write().await;
for _ in main.len()..MAX_SESSION_REFS_PER_RUN {
let k = RegistryKey::new();
main.insert(k, Arc::new(0_i32) as AnyArc);
}
}
assert_eq!(reg.len().await, MAX_SESSION_REFS_PER_RUN);
let err = reg.clone_ref(src).await.unwrap_err();
match err {
SessionRefError::CapacityExceeded { cap } => {
assert_eq!(cap, MAX_SESSION_REFS_PER_RUN);
}
other => panic!("expected CapacityExceeded, got {other:?}"),
}
}
#[tokio::test]
async fn transfer_ref_moves_value_to_destination() {
let src_reg = SessionRefRegistry::new();
let dst_reg = SessionRefRegistry::new();
let key = src_reg.insert(555_i32).await.unwrap();
assert_eq!(src_reg.len().await, 1);
assert_eq!(dst_reg.len().await, 0);
let returned = src_reg.transfer_ref(key, &dst_reg).await.unwrap();
assert_eq!(returned, key);
assert!(
src_reg.get::<i32>(key).await.is_none(),
"source must no longer resolve the transferred key"
);
assert_eq!(src_reg.len().await, 0);
let got = dst_reg
.get::<i32>(returned)
.await
.expect("destination must resolve the transferred key");
assert_eq!(*got, 555);
assert_eq!(dst_reg.len().await, 1);
}
#[tokio::test]
async fn transfer_ref_preserves_key() {
let src_reg = SessionRefRegistry::new();
let dst_reg = SessionRefRegistry::new();
let key = src_reg.insert("preserved".to_owned()).await.unwrap();
let returned = src_reg.transfer_ref(key, &dst_reg).await.unwrap();
assert_eq!(
returned, key,
"transfer must preserve the original RegistryKey so existing \
JSON markers keep resolving"
);
}
#[tokio::test]
async fn transfer_ref_migrates_lifetime() {
let src_reg = SessionRefRegistry::new();
let dst_reg = SessionRefRegistry::new();
let key = src_reg
.insert_with_lifetime(1_i32, RefLifetime::UntilExplicitDrop)
.await
.unwrap();
let returned = src_reg.transfer_ref(key, &dst_reg).await.unwrap();
assert_eq!(
src_reg.lifetime_of(returned).await,
None,
"source lifetime sidecar must be cleared"
);
assert_eq!(
dst_reg.lifetime_of(returned).await,
Some(RefLifetime::UntilExplicitDrop),
"destination lifetime sidecar must inherit the original policy"
);
}
#[tokio::test]
async fn transfer_ref_migrates_serializable_sidecar() {
let src_reg = SessionRefRegistry::new();
let dst_reg = SessionRefRegistry::new();
let key = src_reg
.insert_serializable(Arc::new(TestSerializable { value: 3 }))
.await
.unwrap();
let returned = src_reg.transfer_ref(key, &dst_reg).await.unwrap();
assert!(
src_reg.get_serializable(returned).await.is_none(),
"source sidecar must be cleared after transfer"
);
let ser = dst_reg
.get_serializable(returned)
.await
.expect("destination must carry the migrated sidecar");
assert_eq!(ser.blazen_type_tag(), "test::TestSerializable");
assert_eq!(ser.blazen_serialize().unwrap(), vec![0, 0, 0, 3]);
}
#[tokio::test]
async fn transfer_ref_errors_on_missing_source() {
let src_reg = SessionRefRegistry::new();
let dst_reg = SessionRefRegistry::new();
let bogus = RegistryKey::new();
let err = src_reg.transfer_ref(bogus, &dst_reg).await.unwrap_err();
match err {
SessionRefError::KeyNotFound { key } => assert_eq!(key, bogus),
other => panic!("expected KeyNotFound, got {other:?}"),
}
assert_eq!(src_reg.len().await, 0);
assert_eq!(dst_reg.len().await, 0);
}
#[tokio::test]
async fn transfer_ref_no_op_when_src_equals_dst() {
let reg = SessionRefRegistry::new();
let key = reg.insert(77_i32).await.unwrap();
let len_before = reg.len().await;
let returned = reg.transfer_ref(key, ®).await.unwrap();
assert_eq!(returned, key);
assert_eq!(reg.len().await, len_before);
let got = reg.get::<i32>(key).await.unwrap();
assert_eq!(*got, 77);
}
#[tokio::test]
async fn transfer_ref_no_op_errors_on_missing_key_in_same_registry() {
let reg = SessionRefRegistry::new();
let bogus = RegistryKey::new();
let err = reg.transfer_ref(bogus, ®).await.unwrap_err();
match err {
SessionRefError::KeyNotFound { key } => assert_eq!(key, bogus),
other => panic!("expected KeyNotFound, got {other:?}"),
}
}
#[tokio::test]
async fn transfer_ref_leaves_source_untouched_when_destination_full() {
let src_reg = SessionRefRegistry::new();
let dst_reg = SessionRefRegistry::new();
let key = src_reg.insert(13_i32).await.unwrap();
{
let mut main = dst_reg.inner.write().await;
for _ in 0..MAX_SESSION_REFS_PER_RUN {
let k = RegistryKey::new();
main.insert(k, Arc::new(0_i32) as AnyArc);
}
}
let err = src_reg.transfer_ref(key, &dst_reg).await.unwrap_err();
match err {
SessionRefError::CapacityExceeded { cap } => {
assert_eq!(cap, MAX_SESSION_REFS_PER_RUN);
}
other => panic!("expected CapacityExceeded, got {other:?}"),
}
assert_eq!(src_reg.len().await, 1);
let got = src_reg.get::<i32>(key).await.unwrap();
assert_eq!(*got, 13);
assert_eq!(
src_reg.lifetime_of(key).await,
Some(RefLifetime::UntilContextDrop),
);
}
fn make_descriptor(node: &str, tag: &str, ts: u64) -> RemoteRefDescriptor {
RemoteRefDescriptor {
origin_node_id: node.to_owned(),
type_tag: tag.to_owned(),
created_at_epoch_ms: ts,
}
}
#[tokio::test]
async fn insert_remote_stores_descriptor() {
let reg = SessionRefRegistry::new();
let key = RegistryKey::new();
let desc = make_descriptor("peer-1", "blazen::TestType", 1_700_000_000_000);
reg.insert_remote(key, desc.clone()).await.unwrap();
let got = reg
.get_remote(key)
.await
.expect("descriptor must be present");
assert_eq!(got.origin_node_id, "peer-1");
assert_eq!(got.type_tag, "blazen::TestType");
assert_eq!(got.created_at_epoch_ms, 1_700_000_000_000);
}
#[tokio::test]
async fn is_remote_distinguishes_local_and_remote() {
let reg = SessionRefRegistry::new();
let local_key = reg.insert(123_i32).await.unwrap();
let remote_key = RegistryKey::new();
reg.insert_remote(remote_key, make_descriptor("peer-2", "i32", 0))
.await
.unwrap();
assert!(
!reg.is_remote(local_key).await,
"locally-inserted refs must NOT report as remote"
);
assert!(
reg.is_remote(remote_key).await,
"remotely-inserted refs must report as remote"
);
assert!(!reg.is_remote(RegistryKey::new()).await);
}
#[tokio::test]
async fn remove_clears_remote_sidecar() {
let reg = SessionRefRegistry::new();
let key = RegistryKey::new();
reg.insert_remote(key, make_descriptor("peer-3", "tag", 42))
.await
.unwrap();
assert!(reg.is_remote(key).await);
let _ = reg.remove(key).await;
assert!(reg.get_remote(key).await.is_none());
assert!(!reg.is_remote(key).await);
}
#[tokio::test]
async fn drain_clears_remote_sidecar() {
let reg = SessionRefRegistry::new();
for i in 0..3 {
reg.insert_remote(RegistryKey::new(), make_descriptor("peer-drain", "tag", i))
.await
.unwrap();
}
assert_eq!(reg.remote_entries().await.len(), 3);
let _ = reg.drain().await;
assert!(
reg.remote_entries().await.is_empty(),
"drain must clear the remote_refs sidecar"
);
}
#[tokio::test]
async fn remote_entries_lists_all() {
let reg = SessionRefRegistry::new();
let mut keys = Vec::new();
for i in 0..3_u64 {
let key = RegistryKey::new();
reg.insert_remote(key, make_descriptor("peer-list", "tag", i))
.await
.unwrap();
keys.push(key);
}
let entries = reg.remote_entries().await;
assert_eq!(entries.len(), 3);
let listed: std::collections::HashSet<RegistryKey> =
entries.iter().map(|(k, _)| *k).collect();
let expected: std::collections::HashSet<RegistryKey> = keys.iter().copied().collect();
assert_eq!(listed, expected);
}
#[tokio::test]
async fn remote_refs_respect_capacity() {
let reg = SessionRefRegistry::new();
{
let mut remote = reg.remote_refs.write().await;
for i in 0..MAX_SESSION_REFS_PER_RUN {
remote.insert(
RegistryKey::new(),
make_descriptor("peer-cap", "tag", i as u64),
);
}
}
assert_eq!(reg.remote_entries().await.len(), MAX_SESSION_REFS_PER_RUN);
let err = reg
.insert_remote(RegistryKey::new(), make_descriptor("peer-cap", "tag", 0))
.await
.unwrap_err();
match err {
SessionRefError::CapacityExceeded { cap } => {
assert_eq!(cap, MAX_SESSION_REFS_PER_RUN);
}
other => panic!("expected CapacityExceeded, got {other:?}"),
}
}
#[tokio::test]
async fn local_and_remote_with_same_key_both_resolve() {
let reg = SessionRefRegistry::new();
let key = RegistryKey::new();
reg.insert_remote(key, make_descriptor("peer-shared", "i32", 7))
.await
.unwrap();
{
let mut main = reg.inner.write().await;
main.insert(key, Arc::new(456_i32) as AnyArc);
}
reg.lifetimes
.write()
.await
.insert(key, RefLifetime::default());
let local = reg
.get::<i32>(key)
.await
.expect("main `inner` lookup must resolve the materialized value");
assert_eq!(*local, 456);
let desc = reg
.get_remote(key)
.await
.expect("remote_refs sidecar must still hold the descriptor");
assert_eq!(desc.origin_node_id, "peer-shared");
assert_eq!(desc.type_tag, "i32");
assert_eq!(desc.created_at_epoch_ms, 7);
assert!(reg.is_remote(key).await);
}
#[tokio::test]
async fn remote_ref_not_resolved_error_carries_key_and_node() {
let key = RegistryKey::new();
let err = SessionRefError::RemoteRefNotResolved {
key,
origin_node_id: "peer-err".to_owned(),
};
let msg = err.to_string();
assert!(msg.contains(&key.to_string()));
assert!(msg.contains("peer-err"));
assert!(msg.contains("deref_session_ref"));
}
}