Skip to main content

mako_engine/
registry.rs

1//! Process routing registry.
2//!
3//! Maps string routing keys to [`ProcessIdentity`] values so inbound
4//! EDIFACT messages can be dispatched to the correct running process without
5//! the caller managing a bespoke routing table.
6//!
7//! # Routing key conventions
8//!
9//! Any stable string that uniquely identifies a process for a given message
10//! type works as a routing key. Common patterns:
11//!
12//! | Message type | Recommended key |
13//! |---|---|
14//! | UTILMD waiting for APERAK | `RegistryKey::from_conversation_and_sender(conversation_id, sender_gln)` |
15//! | Route follow-up by correlation | `RegistryKey::from_correlation(correlation_id)` |
16//! | Direct lookup by process | `RegistryKey::from_process(process_id)` |
17//!
18//! One process may be registered under multiple keys when it handles several
19//! different message types simultaneously.
20//!
21//! # Tenant scoping
22//!
23//! All registry operations are scoped to a `TenantId`. This prevents routing
24//! keys from leaking across tenant boundaries when the engine handles multiple
25//! market participants in a single deployment.
26//!
27//! # Usage
28//!
29//! ```rust,ignore
30//! // After spawning a process, register it under the UTILMD conversation ID + sender GLN:
31//! ctx.registry
32//!     .register(tenant_id, &RegistryKey::from_conversation_and_sender(utilmd_conv_id, sender_gln), process.identity())
33//!     .await?;
34//!
35//! // When the APERAK arrives, look up by conversation ID + APERAK sender GLN:
36//! let identity = ctx.registry
37//!     .lookup(tenant_id, &RegistryKey::from_conversation_and_sender(aperak_conv_id, aperak_sender_gln))
38//!     .await?
39//!     .ok_or(EngineError::registry("unknown conversation"))?;
40//!
41//! let process = ctx.resume::<SupplierChangeWorkflow>(identity);
42//! process.execute(HandleAperak { .. }).await?;
43//!
44//! // Clean up after process completion:
45//! ctx.registry.remove(tenant_id, &RegistryKey::from_conversation_and_sender(utilmd_conv_id, sender_gln)).await?;
46//! ```
47
48use std::{fmt, sync::Arc};
49
50#[cfg(any(test, feature = "testing"))]
51use std::collections::HashMap;
52#[cfg(any(test, feature = "testing"))]
53use tokio::sync::RwLock;
54
55use crate::{
56    error::EngineError,
57    ids::{ConversationId, CorrelationId, ProcessId, ProcessIdentity, TenantId},
58};
59
60/// Maximum byte length for a [`RegistryKey`] routing key.
61///
62/// Keys beyond this limit are rejected by [`RegistryKey::parse`] to
63/// prevent oversized LSM keys from bloating the `pr/` key namespace in
64/// SlateDB. AS4 `MessageId` values and EDIFACT correlation identifiers are
65/// typically ≤ 36 bytes (UUID); 256 bytes provides ample headroom.
66pub const MAX_REGISTRY_KEY_LEN: usize = 256;
67
68// ── RegistryKey ───────────────────────────────────────────────────────────────
69
70/// A typed routing key for the [`ProcessRegistry`].
71///
72/// Using a newtype instead of a bare `&str` prevents accidental key-format
73/// mismatches (e.g. mixing `conversation_id` and `correlation_id` keys at
74/// different call sites) and makes the key derivation convention explicit at
75/// the type level.
76///
77/// # Named constructors
78///
79/// | Constructor | Use case |
80/// |---|---|
81/// | Constructor | Use case |
82/// |---|---|
83/// | [`from_conversation_and_sender`] | Route inbound APERAK by UTILMD conversation ID + sender GLN |
84/// | [`from_correlation`] | Route follow-up messages by root correlation |
85/// | [`from_process`] | Direct lookup by process instance |
86/// | [`parse`] | Primary validated constructor for runtime-derived keys |
87/// | [`from_static`] | Infallible constructor for compile-time-known string literals |
88///
89/// [`from_conversation_and_sender`]: RegistryKey::from_conversation_and_sender
90/// [`from_correlation`]: RegistryKey::from_correlation
91/// [`from_process`]: RegistryKey::from_process
92/// [`parse`]: RegistryKey::parse
93/// [`from_static`]: RegistryKey::from_static
94#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
95pub struct RegistryKey(Box<str>);
96
97impl RegistryKey {
98    /// Key derived from a conversation ID **and sender GLN**.
99    ///
100    /// This is the correct constructor for UTILMD ↔ APERAK routing in
101    /// multi-market-participant deployments.
102    ///
103    /// # Why sender is required
104    ///
105    /// EDIFACT conversation IDs are assigned by the sender within their own
106    /// numbering space.  Two senders may independently assign the same
107    /// conversation-ID string, which would collide if keyed on conversation
108    /// alone.  Including the sender GLN as a discriminator makes the key
109    /// globally unique within the tenant's namespace.
110    ///
111    /// # Key format
112    ///
113    /// `"{sender_gln}:{conversation_id}"` — stable, URL-safe, human-readable.
114    #[must_use]
115    pub fn from_conversation_and_sender(id: ConversationId, sender_gln: &str) -> Self {
116        let key = format!("{sender_gln}:{id}");
117        Self(key.into_boxed_str())
118    }
119
120    /// Key derived from a correlation ID (route all messages in the same root trace).
121    #[must_use]
122    pub fn from_correlation(id: CorrelationId) -> Self {
123        Self(id.to_string().into_boxed_str())
124    }
125
126    /// Key derived from a process ID (direct process lookup).
127    #[must_use]
128    pub fn from_process(id: ProcessId) -> Self {
129        Self(id.to_string().into_boxed_str())
130    }
131
132    /// Primary validated constructor for runtime-derived keys.
133    ///
134    /// Returns [`EngineError::Registry`] when `s` contains a NUL byte or
135    /// exceeds [`MAX_REGISTRY_KEY_LEN`] bytes. Use this for all keys derived
136    /// from untrusted input (e.g. EDIFACT `MessageId`, AS4 `conversation_id`,
137    /// or user-supplied strings).
138    ///
139    /// # Errors
140    ///
141    /// - [`EngineError::Registry`] when `s` contains `\0`.
142    /// - [`EngineError::Registry`] when `s.len() > MAX_REGISTRY_KEY_LEN`.
143    pub fn parse(s: &str) -> Result<Self, EngineError> {
144        if s.contains('\0') {
145            return Err(EngineError::registry(
146                "registry key must not contain NUL bytes",
147            ));
148        }
149        if s.len() > MAX_REGISTRY_KEY_LEN {
150            return Err(EngineError::registry(format!(
151                "registry key is {} bytes, exceeds maximum of {MAX_REGISTRY_KEY_LEN}",
152                s.len()
153            )));
154        }
155        Ok(Self(s.into()))
156    }
157
158    /// Infallible constructor for **compile-time-known** string literals.
159    ///
160    /// Panics at runtime if `s` contains a NUL byte or exceeds
161    /// [`MAX_REGISTRY_KEY_LEN`] — but since this is only correct to call with
162    /// string literals, any violation would be caught immediately in tests.
163    ///
164    /// # Panics
165    ///
166    /// Panics when `s` contains a NUL byte or exceeds [`MAX_REGISTRY_KEY_LEN`]
167    /// bytes. **Only use this with string literals.** Use [`parse`] for any
168    /// value that may be runtime-derived.
169    ///
170    /// [`parse`]: RegistryKey::parse
171    #[must_use]
172    pub fn from_static(s: &'static str) -> Self {
173        assert!(
174            !s.contains('\0'),
175            "RegistryKey::from_static: key must not contain NUL bytes"
176        );
177        assert!(
178            s.len() <= MAX_REGISTRY_KEY_LEN,
179            "RegistryKey::from_static: key exceeds MAX_REGISTRY_KEY_LEN ({MAX_REGISTRY_KEY_LEN} bytes)"
180        );
181        Self(s.into())
182    }
183
184    /// The raw key string.
185    #[must_use]
186    pub fn as_str(&self) -> &str {
187        &self.0
188    }
189}
190
191impl fmt::Display for RegistryKey {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        f.write_str(&self.0)
194    }
195}
196
197impl std::str::FromStr for RegistryKey {
198    type Err = EngineError;
199
200    fn from_str(s: &str) -> Result<Self, Self::Err> {
201        RegistryKey::parse(s)
202    }
203}
204
205// ── ProcessRegistry ───────────────────────────────────────────────────────────
206
207/// Routes inbound messages to their target processes by string key.
208///
209/// A `ProcessRegistry` decouples message routing from process creation.
210/// Register a [`ProcessIdentity`] under a stable key at process creation
211/// time, then look it up by that key when routing subsequent inbound messages.
212///
213/// All operations are scoped to a [`TenantId`] so routing entries from
214/// different market participants cannot collide.
215///
216/// ## Correlated-process lookup (1:many)
217///
218/// The standard `register`/`lookup` API maps a key 1:1 to a single process.
219/// For cases where multiple processes share a common business identifier —
220/// for example, all MSCONS measurement-data processes for a single MaLo ID
221/// in MABIS billing aggregation — use the correlated index:
222///
223/// - [`register_correlated`]: associate a `(tenant, tag, process_id)` triple.
224/// - [`lookup_correlated`]: retrieve **all** `ProcessIdentity` values for a tag.
225/// - [`remove_correlated`]: remove a single process from the tag's fan-out set.
226///
227/// The tag is an arbitrary opaque string (e.g. a MaLo ID such as
228/// `"DE0001234567890"`). Key validation rules match [`RegistryKey`].
229///
230/// ```rust,ignore
231/// // Register all MSCONS processes for a Bilanzkreis MaLo:
232/// for process in &mscons_processes {
233///     ctx.registry
234///         .register_correlated(tenant_id, malo_id, process.process_id(), process.identity())
235///         .await?;
236/// }
237///
238/// // Retrieve all processes for billing aggregation:
239/// let identities = ctx.registry
240///     .lookup_correlated(tenant_id, malo_id)
241///     .await?;
242/// ```
243///
244/// ## Blanket `Arc` implementation
245///
246/// `Arc<S>` implements `ProcessRegistry` whenever `S: ProcessRegistry`,
247/// enabling shared access from multiple concurrent message handlers.
248///
249/// [`register_correlated`]: ProcessRegistry::register_correlated
250/// [`lookup_correlated`]:   ProcessRegistry::lookup_correlated
251/// [`remove_correlated`]:   ProcessRegistry::remove_correlated
252#[allow(async_fn_in_trait)]
253pub trait ProcessRegistry: Send + Sync {
254    /// Associate `key` with `identity` for the given `tenant_id`.
255    ///
256    /// Overwrites any existing mapping for the `(tenant_id, key)` pair
257    /// (upsert semantics).
258    ///
259    /// # Errors
260    ///
261    /// Returns [`EngineError::Registry`] on storage failure.
262    async fn register(
263        &self,
264        tenant_id: TenantId,
265        key: &RegistryKey,
266        identity: ProcessIdentity,
267    ) -> Result<(), EngineError>;
268
269    /// Return the identity associated with `(tenant_id, key)`, or `None`
270    /// if not registered.
271    ///
272    /// # Errors
273    ///
274    /// Returns [`EngineError::Registry`] on storage failure.
275    async fn lookup(
276        &self,
277        tenant_id: TenantId,
278        key: &RegistryKey,
279    ) -> Result<Option<ProcessIdentity>, EngineError>;
280
281    /// Remove the mapping for `(tenant_id, key)`. No-op if not found.
282    ///
283    /// # Errors
284    ///
285    /// Returns [`EngineError::Registry`] on storage failure.
286    async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError>;
287
288    /// Return `true` when `(tenant_id, key)` has a registered mapping.
289    ///
290    /// # Errors
291    ///
292    /// Returns [`EngineError::Registry`] on storage failure.
293    async fn contains(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<bool, EngineError> {
294        Ok(self.lookup(tenant_id, key).await?.is_some())
295    }
296
297    /// Total number of registered routing keys across all tenants.
298    ///
299    /// # Errors
300    ///
301    /// Returns [`EngineError::Registry`] on storage failure.
302    async fn len(&self) -> Result<usize, EngineError>;
303
304    /// Return `true` when no routing keys are registered.
305    ///
306    /// # Errors
307    ///
308    /// Returns [`EngineError::Registry`] on storage failure.
309    async fn is_empty(&self) -> Result<bool, EngineError> {
310        Ok(self.len().await? == 0)
311    }
312
313    // ── Correlated (1:many) index ────────────────────────────────────────────
314
315    /// Associate `process_id`/`identity` with the correlation `tag` for the
316    /// given `tenant_id`.
317    ///
318    /// Multiple processes can be registered under the same `(tenant_id, tag)`,
319    /// making `lookup_correlated` return all of them. This is the fan-out
320    /// counterpart to the 1:1 `register`/`lookup` API.
321    ///
322    /// # Tag constraints
323    ///
324    /// Same validation as [`RegistryKey`]: must not contain `\0`, must be
325    /// ≤ [`MAX_REGISTRY_KEY_LEN`] bytes.
326    ///
327    /// # Errors
328    ///
329    /// Returns [`EngineError::Registry`] on storage failure or invalid tag.
330    async fn register_correlated(
331        &self,
332        tenant_id: TenantId,
333        tag: &str,
334        process_id: crate::ids::ProcessId,
335        identity: ProcessIdentity,
336    ) -> Result<(), EngineError>;
337
338    /// Return all `ProcessIdentity` values registered under `(tenant_id, tag)`.
339    ///
340    /// Returns an empty `Vec` when no entries exist for the tag.
341    ///
342    /// # Errors
343    ///
344    /// Returns [`EngineError::Registry`] on storage failure.
345    async fn lookup_correlated(
346        &self,
347        tenant_id: TenantId,
348        tag: &str,
349    ) -> Result<Vec<ProcessIdentity>, EngineError>;
350
351    /// Remove the `process_id` entry from the `(tenant_id, tag)` fan-out set.
352    ///
353    /// No-op when the entry does not exist.
354    ///
355    /// # Errors
356    ///
357    /// Returns [`EngineError::Registry`] on storage failure.
358    async fn remove_correlated(
359        &self,
360        tenant_id: TenantId,
361        tag: &str,
362        process_id: crate::ids::ProcessId,
363    ) -> Result<(), EngineError>;
364}
365
366// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
367
368impl<S: ProcessRegistry> ProcessRegistry for Arc<S> {
369    async fn register(
370        &self,
371        tenant_id: TenantId,
372        key: &RegistryKey,
373        identity: ProcessIdentity,
374    ) -> Result<(), EngineError> {
375        self.as_ref().register(tenant_id, key, identity).await
376    }
377
378    async fn lookup(
379        &self,
380        tenant_id: TenantId,
381        key: &RegistryKey,
382    ) -> Result<Option<ProcessIdentity>, EngineError> {
383        self.as_ref().lookup(tenant_id, key).await
384    }
385
386    async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
387        self.as_ref().remove(tenant_id, key).await
388    }
389
390    async fn len(&self) -> Result<usize, EngineError> {
391        self.as_ref().len().await
392    }
393
394    async fn register_correlated(
395        &self,
396        tenant_id: TenantId,
397        tag: &str,
398        process_id: crate::ids::ProcessId,
399        identity: ProcessIdentity,
400    ) -> Result<(), EngineError> {
401        self.as_ref()
402            .register_correlated(tenant_id, tag, process_id, identity)
403            .await
404    }
405
406    async fn lookup_correlated(
407        &self,
408        tenant_id: TenantId,
409        tag: &str,
410    ) -> Result<Vec<ProcessIdentity>, EngineError> {
411        self.as_ref().lookup_correlated(tenant_id, tag).await
412    }
413
414    async fn remove_correlated(
415        &self,
416        tenant_id: TenantId,
417        tag: &str,
418        process_id: crate::ids::ProcessId,
419    ) -> Result<(), EngineError> {
420        self.as_ref()
421            .remove_correlated(tenant_id, tag, process_id)
422            .await
423    }
424}
425
426// ── NoopProcessRegistry ───────────────────────────────────────────────────────
427
428/// A [`ProcessRegistry`] that never stores any mappings.
429///
430/// Every `lookup` returns `None`. Use this as the default when routing is
431/// managed externally or not required.
432///
433/// # ⚠️ Routing loss warning
434///
435/// `NoopProcessRegistry` **discards every routing registration silently**.
436/// All `lookup` calls return `None`. Inbound messages for existing processes
437/// will not be routed. Do not use in production when message routing is needed.
438///
439/// This type is available in all build configurations so it can serve as a
440/// default type parameter in [`EngineBuilder`]. However, [`EngineBuilder::new`]
441/// (which wires this as the default) is only available with the `testing`
442/// feature or in `cfg(test)`. Production binaries must call
443/// [`EngineBuilder::with_stores`] instead.
444///
445/// [`EngineBuilder`]: crate::builder::EngineBuilder
446/// [`EngineBuilder::new`]: crate::builder::EngineBuilder::new
447/// [`EngineBuilder::with_stores`]: crate::builder::EngineBuilder::with_stores
448#[derive(Debug, Clone, Copy, Default)]
449#[must_use = "NoopProcessRegistry discards all routing registrations silently — use a persistent ProcessRegistry in production"]
450pub struct NoopProcessRegistry;
451
452#[cfg(any(test, feature = "testing"))]
453impl ProcessRegistry for NoopProcessRegistry {
454    async fn register(
455        &self,
456        _tenant_id: TenantId,
457        _key: &RegistryKey,
458        _identity: ProcessIdentity,
459    ) -> Result<(), EngineError> {
460        Ok(())
461    }
462
463    async fn lookup(
464        &self,
465        _tenant_id: TenantId,
466        _key: &RegistryKey,
467    ) -> Result<Option<ProcessIdentity>, EngineError> {
468        Ok(None)
469    }
470
471    async fn remove(&self, _tenant_id: TenantId, _key: &RegistryKey) -> Result<(), EngineError> {
472        Ok(())
473    }
474
475    async fn len(&self) -> Result<usize, EngineError> {
476        Ok(0)
477    }
478
479    async fn register_correlated(
480        &self,
481        _tenant_id: TenantId,
482        _tag: &str,
483        _process_id: crate::ids::ProcessId,
484        _identity: ProcessIdentity,
485    ) -> Result<(), EngineError> {
486        Ok(())
487    }
488
489    async fn lookup_correlated(
490        &self,
491        _tenant_id: TenantId,
492        _tag: &str,
493    ) -> Result<Vec<ProcessIdentity>, EngineError> {
494        Ok(vec![])
495    }
496
497    async fn remove_correlated(
498        &self,
499        _tenant_id: TenantId,
500        _tag: &str,
501        _process_id: crate::ids::ProcessId,
502    ) -> Result<(), EngineError> {
503        Ok(())
504    }
505}
506
507// ── InMemoryProcessRegistry ───────────────────────────────────────────────────
508
509/// An in-memory [`ProcessRegistry`] for tests and development.
510///
511/// Backed by a `HashMap<(TenantId, String), ProcessIdentity>` protected by a
512/// `RwLock`. Cloning shares the underlying data via `Arc` — all clones see the
513/// same mappings.
514///
515/// Use this with [`EngineContext`] to verify message routing without
516/// depending on an external registry service.
517///
518/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
519///
520/// [`EngineContext`]: crate::builder::EngineContext
521#[cfg(any(test, feature = "testing"))]
522#[derive(Debug, Default, Clone)]
523pub struct InMemoryProcessRegistry {
524    #[allow(clippy::type_complexity)]
525    inner: Arc<RwLock<HashMap<(TenantId, Box<str>), ProcessIdentity>>>,
526    /// Correlated 1:many index: `(tenant_id, tag, process_id)` → `ProcessIdentity`.
527    #[allow(clippy::type_complexity)]
528    correlated: Arc<RwLock<HashMap<(TenantId, Box<str>, crate::ids::ProcessId), ProcessIdentity>>>,
529}
530
531#[cfg(any(test, feature = "testing"))]
532impl InMemoryProcessRegistry {
533    /// Create an empty registry.
534    #[must_use]
535    pub fn new() -> Self {
536        Self::default()
537    }
538
539    /// Return `true` when no routing keys are registered.
540    pub async fn is_empty_async(&self) -> bool {
541        self.inner.read().await.is_empty()
542    }
543}
544
545#[cfg(any(test, feature = "testing"))]
546impl ProcessRegistry for InMemoryProcessRegistry {
547    async fn register(
548        &self,
549        tenant_id: TenantId,
550        key: &RegistryKey,
551        identity: ProcessIdentity,
552    ) -> Result<(), EngineError> {
553        self.inner
554            .write()
555            .await
556            .insert((tenant_id, key.0.clone()), identity);
557        Ok(())
558    }
559
560    async fn lookup(
561        &self,
562        tenant_id: TenantId,
563        key: &RegistryKey,
564    ) -> Result<Option<ProcessIdentity>, EngineError> {
565        Ok(self
566            .inner
567            .read()
568            .await
569            .get(&(tenant_id, key.0.clone()))
570            .cloned())
571    }
572
573    async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
574        self.inner.write().await.remove(&(tenant_id, key.0.clone()));
575        Ok(())
576    }
577
578    async fn len(&self) -> Result<usize, EngineError> {
579        Ok(self.inner.read().await.len())
580    }
581
582    async fn register_correlated(
583        &self,
584        tenant_id: TenantId,
585        tag: &str,
586        process_id: crate::ids::ProcessId,
587        identity: ProcessIdentity,
588    ) -> Result<(), EngineError> {
589        self.correlated
590            .write()
591            .await
592            .insert((tenant_id, tag.into(), process_id), identity);
593        Ok(())
594    }
595
596    async fn lookup_correlated(
597        &self,
598        tenant_id: TenantId,
599        tag: &str,
600    ) -> Result<Vec<ProcessIdentity>, EngineError> {
601        let guard = self.correlated.read().await;
602        let result = guard
603            .iter()
604            .filter(|((tid, t, _), _)| *tid == tenant_id && t.as_ref() == tag)
605            .map(|(_, identity)| identity.clone())
606            .collect();
607        Ok(result)
608    }
609
610    async fn remove_correlated(
611        &self,
612        tenant_id: TenantId,
613        tag: &str,
614        process_id: crate::ids::ProcessId,
615    ) -> Result<(), EngineError> {
616        self.correlated
617            .write()
618            .await
619            .remove(&(tenant_id, tag.into(), process_id));
620        Ok(())
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use crate::{
628        ids::{ProcessId, TenantId},
629        version::WorkflowId,
630    };
631
632    fn make_identity() -> ProcessIdentity {
633        let pid = ProcessId::new();
634        ProcessIdentity::new(
635            pid,
636            TenantId::new(),
637            WorkflowId::new("test", "FV2024-10-01"),
638        )
639    }
640
641    fn tid() -> TenantId {
642        TenantId::new()
643    }
644
645    fn key(s: &str) -> RegistryKey {
646        RegistryKey::parse(s).expect("valid test key")
647    }
648
649    #[tokio::test]
650    async fn register_and_lookup() {
651        let reg = InMemoryProcessRegistry::new();
652        let tenant = tid();
653        let id = make_identity();
654        reg.register(tenant, &key("conv:abc"), id.clone())
655            .await
656            .unwrap();
657        let found = reg
658            .lookup(tenant, &key("conv:abc"))
659            .await
660            .unwrap()
661            .expect("must be found");
662        assert_eq!(found.process_id, id.process_id);
663    }
664
665    #[tokio::test]
666    async fn lookup_returns_none_for_unknown_key() {
667        let reg = InMemoryProcessRegistry::new();
668        assert!(reg.lookup(tid(), &key("unknown")).await.unwrap().is_none());
669    }
670
671    #[tokio::test]
672    async fn remove_clears_mapping() {
673        let reg = InMemoryProcessRegistry::new();
674        let tenant = tid();
675        let id = make_identity();
676        reg.register(tenant, &key("k1"), id).await.unwrap();
677        reg.remove(tenant, &key("k1")).await.unwrap();
678        assert!(reg.lookup(tenant, &key("k1")).await.unwrap().is_none());
679    }
680
681    #[tokio::test]
682    async fn upsert_overwrites_existing() {
683        let reg = InMemoryProcessRegistry::new();
684        let tenant = tid();
685        let id1 = make_identity();
686        let id2 = make_identity();
687        reg.register(tenant, &key("k1"), id1).await.unwrap();
688        reg.register(tenant, &key("k1"), id2.clone()).await.unwrap();
689        let found = reg.lookup(tenant, &key("k1")).await.unwrap().unwrap();
690        assert_eq!(found.process_id, id2.process_id);
691        assert_eq!(
692            reg.len().await.unwrap(),
693            1,
694            "upsert must not duplicate the key"
695        );
696    }
697
698    #[tokio::test]
699    async fn contains_matches_register() {
700        let reg = InMemoryProcessRegistry::new();
701        let tenant = tid();
702        assert!(!reg.contains(tenant, &key("k1")).await.unwrap());
703        reg.register(tenant, &key("k1"), make_identity())
704            .await
705            .unwrap();
706        assert!(reg.contains(tenant, &key("k1")).await.unwrap());
707    }
708
709    #[tokio::test]
710    async fn clone_shares_state() {
711        let reg1 = InMemoryProcessRegistry::new();
712        let reg2 = reg1.clone();
713        let tenant = tid();
714        reg1.register(tenant, &key("k1"), make_identity())
715            .await
716            .unwrap();
717        assert!(reg2.contains(tenant, &key("k1")).await.unwrap());
718    }
719
720    /// `from_conversation_and_sender` key contains sender prefix.
721    #[test]
722    fn from_conversation_and_sender_key_contains_sender() {
723        use crate::ids::ConversationId;
724        let conv = ConversationId::new();
725        let k = RegistryKey::from_conversation_and_sender(conv, "4012345000023");
726        assert!(k.as_str().starts_with("4012345000023:"));
727        assert!(k.as_str().ends_with(&conv.to_string()));
728    }
729
730    #[tokio::test]
731    async fn tenant_keys_are_isolated() {
732        let reg = InMemoryProcessRegistry::new();
733        let t1 = tid();
734        let t2 = tid();
735        reg.register(t1, &key("k1"), make_identity()).await.unwrap();
736        assert!(
737            reg.contains(t1, &key("k1")).await.unwrap(),
738            "tenant1 must see key"
739        );
740        assert!(
741            !reg.contains(t2, &key("k1")).await.unwrap(),
742            "tenant2 must not see key"
743        );
744    }
745}