use std::{fmt, sync::Arc};
#[cfg(any(test, feature = "testing"))]
use std::collections::HashMap;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::RwLock;
use crate::{
error::EngineError,
ids::{ConversationId, CorrelationId, ProcessId, ProcessIdentity, TenantId},
};
pub const MAX_REGISTRY_KEY_LEN: usize = 256;
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct RegistryKey(Box<str>);
impl RegistryKey {
#[must_use]
pub fn from_conversation_and_sender(id: ConversationId, sender_gln: &str) -> Self {
let key = format!("{sender_gln}:{id}");
Self(key.into_boxed_str())
}
#[must_use]
pub fn from_correlation(id: CorrelationId) -> Self {
Self(id.to_string().into_boxed_str())
}
#[must_use]
pub fn from_process(id: ProcessId) -> Self {
Self(id.to_string().into_boxed_str())
}
pub fn parse(s: &str) -> Result<Self, EngineError> {
if s.contains('\0') {
return Err(EngineError::registry(
"registry key must not contain NUL bytes",
));
}
if s.len() > MAX_REGISTRY_KEY_LEN {
return Err(EngineError::registry(format!(
"registry key is {} bytes, exceeds maximum of {MAX_REGISTRY_KEY_LEN}",
s.len()
)));
}
Ok(Self(s.into()))
}
#[must_use]
pub fn from_static(s: &'static str) -> Self {
assert!(
!s.contains('\0'),
"RegistryKey::from_static: key must not contain NUL bytes"
);
assert!(
s.len() <= MAX_REGISTRY_KEY_LEN,
"RegistryKey::from_static: key exceeds MAX_REGISTRY_KEY_LEN ({MAX_REGISTRY_KEY_LEN} bytes)"
);
Self(s.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for RegistryKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl std::str::FromStr for RegistryKey {
type Err = EngineError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
RegistryKey::parse(s)
}
}
#[allow(async_fn_in_trait)]
pub trait ProcessRegistry: Send + Sync {
async fn register(
&self,
tenant_id: TenantId,
key: &RegistryKey,
identity: ProcessIdentity,
) -> Result<(), EngineError>;
async fn lookup(
&self,
tenant_id: TenantId,
key: &RegistryKey,
) -> Result<Option<ProcessIdentity>, EngineError>;
async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError>;
async fn contains(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<bool, EngineError> {
Ok(self.lookup(tenant_id, key).await?.is_some())
}
async fn len(&self) -> Result<usize, EngineError>;
async fn is_empty(&self) -> Result<bool, EngineError> {
Ok(self.len().await? == 0)
}
async fn register_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
identity: ProcessIdentity,
) -> Result<(), EngineError>;
async fn lookup_correlated(
&self,
tenant_id: TenantId,
tag: &str,
) -> Result<Vec<ProcessIdentity>, EngineError>;
async fn remove_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
) -> Result<(), EngineError>;
}
impl<S: ProcessRegistry> ProcessRegistry for Arc<S> {
async fn register(
&self,
tenant_id: TenantId,
key: &RegistryKey,
identity: ProcessIdentity,
) -> Result<(), EngineError> {
self.as_ref().register(tenant_id, key, identity).await
}
async fn lookup(
&self,
tenant_id: TenantId,
key: &RegistryKey,
) -> Result<Option<ProcessIdentity>, EngineError> {
self.as_ref().lookup(tenant_id, key).await
}
async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
self.as_ref().remove(tenant_id, key).await
}
async fn len(&self) -> Result<usize, EngineError> {
self.as_ref().len().await
}
async fn register_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
identity: ProcessIdentity,
) -> Result<(), EngineError> {
self.as_ref()
.register_correlated(tenant_id, tag, process_id, identity)
.await
}
async fn lookup_correlated(
&self,
tenant_id: TenantId,
tag: &str,
) -> Result<Vec<ProcessIdentity>, EngineError> {
self.as_ref().lookup_correlated(tenant_id, tag).await
}
async fn remove_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
) -> Result<(), EngineError> {
self.as_ref()
.remove_correlated(tenant_id, tag, process_id)
.await
}
}
#[derive(Debug, Clone, Copy, Default)]
#[must_use = "NoopProcessRegistry discards all routing registrations silently — use a persistent ProcessRegistry in production"]
pub struct NoopProcessRegistry;
#[cfg(any(test, feature = "testing"))]
impl ProcessRegistry for NoopProcessRegistry {
async fn register(
&self,
_tenant_id: TenantId,
_key: &RegistryKey,
_identity: ProcessIdentity,
) -> Result<(), EngineError> {
Ok(())
}
async fn lookup(
&self,
_tenant_id: TenantId,
_key: &RegistryKey,
) -> Result<Option<ProcessIdentity>, EngineError> {
Ok(None)
}
async fn remove(&self, _tenant_id: TenantId, _key: &RegistryKey) -> Result<(), EngineError> {
Ok(())
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(0)
}
async fn register_correlated(
&self,
_tenant_id: TenantId,
_tag: &str,
_process_id: crate::ids::ProcessId,
_identity: ProcessIdentity,
) -> Result<(), EngineError> {
Ok(())
}
async fn lookup_correlated(
&self,
_tenant_id: TenantId,
_tag: &str,
) -> Result<Vec<ProcessIdentity>, EngineError> {
Ok(vec![])
}
async fn remove_correlated(
&self,
_tenant_id: TenantId,
_tag: &str,
_process_id: crate::ids::ProcessId,
) -> Result<(), EngineError> {
Ok(())
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Default, Clone)]
pub struct InMemoryProcessRegistry {
#[allow(clippy::type_complexity)]
inner: Arc<RwLock<HashMap<(TenantId, Box<str>), ProcessIdentity>>>,
#[allow(clippy::type_complexity)]
correlated: Arc<RwLock<HashMap<(TenantId, Box<str>, crate::ids::ProcessId), ProcessIdentity>>>,
}
#[cfg(any(test, feature = "testing"))]
impl InMemoryProcessRegistry {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn is_empty_async(&self) -> bool {
self.inner.read().await.is_empty()
}
}
#[cfg(any(test, feature = "testing"))]
impl ProcessRegistry for InMemoryProcessRegistry {
async fn register(
&self,
tenant_id: TenantId,
key: &RegistryKey,
identity: ProcessIdentity,
) -> Result<(), EngineError> {
self.inner
.write()
.await
.insert((tenant_id, key.0.clone()), identity);
Ok(())
}
async fn lookup(
&self,
tenant_id: TenantId,
key: &RegistryKey,
) -> Result<Option<ProcessIdentity>, EngineError> {
Ok(self
.inner
.read()
.await
.get(&(tenant_id, key.0.clone()))
.cloned())
}
async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
self.inner.write().await.remove(&(tenant_id, key.0.clone()));
Ok(())
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(self.inner.read().await.len())
}
async fn register_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
identity: ProcessIdentity,
) -> Result<(), EngineError> {
self.correlated
.write()
.await
.insert((tenant_id, tag.into(), process_id), identity);
Ok(())
}
async fn lookup_correlated(
&self,
tenant_id: TenantId,
tag: &str,
) -> Result<Vec<ProcessIdentity>, EngineError> {
let guard = self.correlated.read().await;
let result = guard
.iter()
.filter(|((tid, t, _), _)| *tid == tenant_id && t.as_ref() == tag)
.map(|(_, identity)| identity.clone())
.collect();
Ok(result)
}
async fn remove_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
) -> Result<(), EngineError> {
self.correlated
.write()
.await
.remove(&(tenant_id, tag.into(), process_id));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
ids::{ProcessId, TenantId},
version::WorkflowId,
};
fn make_identity() -> ProcessIdentity {
let pid = ProcessId::new();
ProcessIdentity::new(
pid,
TenantId::new(),
WorkflowId::new("test", "FV2024-10-01"),
)
}
fn tid() -> TenantId {
TenantId::new()
}
fn key(s: &str) -> RegistryKey {
RegistryKey::parse(s).expect("valid test key")
}
#[tokio::test]
async fn register_and_lookup() {
let reg = InMemoryProcessRegistry::new();
let tenant = tid();
let id = make_identity();
reg.register(tenant, &key("conv:abc"), id.clone())
.await
.unwrap();
let found = reg
.lookup(tenant, &key("conv:abc"))
.await
.unwrap()
.expect("must be found");
assert_eq!(found.process_id, id.process_id);
}
#[tokio::test]
async fn lookup_returns_none_for_unknown_key() {
let reg = InMemoryProcessRegistry::new();
assert!(reg.lookup(tid(), &key("unknown")).await.unwrap().is_none());
}
#[tokio::test]
async fn remove_clears_mapping() {
let reg = InMemoryProcessRegistry::new();
let tenant = tid();
let id = make_identity();
reg.register(tenant, &key("k1"), id).await.unwrap();
reg.remove(tenant, &key("k1")).await.unwrap();
assert!(reg.lookup(tenant, &key("k1")).await.unwrap().is_none());
}
#[tokio::test]
async fn upsert_overwrites_existing() {
let reg = InMemoryProcessRegistry::new();
let tenant = tid();
let id1 = make_identity();
let id2 = make_identity();
reg.register(tenant, &key("k1"), id1).await.unwrap();
reg.register(tenant, &key("k1"), id2.clone()).await.unwrap();
let found = reg.lookup(tenant, &key("k1")).await.unwrap().unwrap();
assert_eq!(found.process_id, id2.process_id);
assert_eq!(
reg.len().await.unwrap(),
1,
"upsert must not duplicate the key"
);
}
#[tokio::test]
async fn contains_matches_register() {
let reg = InMemoryProcessRegistry::new();
let tenant = tid();
assert!(!reg.contains(tenant, &key("k1")).await.unwrap());
reg.register(tenant, &key("k1"), make_identity())
.await
.unwrap();
assert!(reg.contains(tenant, &key("k1")).await.unwrap());
}
#[tokio::test]
async fn clone_shares_state() {
let reg1 = InMemoryProcessRegistry::new();
let reg2 = reg1.clone();
let tenant = tid();
reg1.register(tenant, &key("k1"), make_identity())
.await
.unwrap();
assert!(reg2.contains(tenant, &key("k1")).await.unwrap());
}
#[test]
fn from_conversation_and_sender_key_contains_sender() {
use crate::ids::ConversationId;
let conv = ConversationId::new();
let k = RegistryKey::from_conversation_and_sender(conv, "4012345000023");
assert!(k.as_str().starts_with("4012345000023:"));
assert!(k.as_str().ends_with(&conv.to_string()));
}
#[tokio::test]
async fn tenant_keys_are_isolated() {
let reg = InMemoryProcessRegistry::new();
let t1 = tid();
let t2 = tid();
reg.register(t1, &key("k1"), make_identity()).await.unwrap();
assert!(
reg.contains(t1, &key("k1")).await.unwrap(),
"tenant1 must see key"
);
assert!(
!reg.contains(t2, &key("k1")).await.unwrap(),
"tenant2 must not see key"
);
}
}