mako_engine/registry.rs
1//! Process routing registry.
2//!
3//! Maps string routing keys to [`ProcessIdentity`] values so inbound
4//! EDIFACT messages can be dispatched to the correct running process without
5//! the caller managing a bespoke routing table.
6//!
7//! # Routing key conventions
8//!
9//! Any stable string that uniquely identifies a process for a given message
10//! type works as a routing key. Common patterns:
11//!
12//! | Message type | Recommended key |
13//! |---|---|
14//! | UTILMD waiting for APERAK | `RegistryKey::from_conversation_and_sender(conversation_id, sender_gln)` |
15//! | Route follow-up by correlation | `RegistryKey::from_correlation(correlation_id)` |
16//! | Direct lookup by process | `RegistryKey::from_process(process_id)` |
17//!
18//! One process may be registered under multiple keys when it handles several
19//! different message types simultaneously.
20//!
21//! # Tenant scoping
22//!
23//! All registry operations are scoped to a `TenantId`. This prevents routing
24//! keys from leaking across tenant boundaries when the engine handles multiple
25//! market participants in a single deployment.
26//!
27//! # Usage
28//!
29//! ```rust,ignore
30//! // After spawning a process, register it under the UTILMD conversation ID + sender GLN:
31//! ctx.registry
32//! .register(tenant_id, &RegistryKey::from_conversation_and_sender(utilmd_conv_id, sender_gln), process.identity())
33//! .await?;
34//!
35//! // When the APERAK arrives, look up by conversation ID + APERAK sender GLN:
36//! let identity = ctx.registry
37//! .lookup(tenant_id, &RegistryKey::from_conversation_and_sender(aperak_conv_id, aperak_sender_gln))
38//! .await?
39//! .ok_or(EngineError::registry("unknown conversation"))?;
40//!
41//! let process = ctx.resume::<SupplierChangeWorkflow>(identity);
42//! process.execute(HandleAperak { .. }).await?;
43//!
44//! // Clean up after process completion:
45//! ctx.registry.remove(tenant_id, &RegistryKey::from_conversation_and_sender(utilmd_conv_id, sender_gln)).await?;
46//! ```
47
48use std::{fmt, sync::Arc};
49
50#[cfg(any(test, feature = "testing"))]
51use std::collections::HashMap;
52#[cfg(any(test, feature = "testing"))]
53use tokio::sync::RwLock;
54
55use crate::{
56 error::EngineError,
57 ids::{ConversationId, CorrelationId, ProcessId, ProcessIdentity, TenantId},
58};
59
60/// Maximum byte length for a [`RegistryKey`] routing key.
61///
62/// Keys beyond this limit are rejected by [`RegistryKey::parse`] to
63/// prevent oversized LSM keys from bloating the `pr/` key namespace in
64/// SlateDB. AS4 `MessageId` values and EDIFACT correlation identifiers are
65/// typically ≤ 36 bytes (UUID); 256 bytes provides ample headroom.
66pub const MAX_REGISTRY_KEY_LEN: usize = 256;
67
68// ── RegistryKey ───────────────────────────────────────────────────────────────
69
70/// A typed routing key for the [`ProcessRegistry`].
71///
72/// Using a newtype instead of a bare `&str` prevents accidental key-format
73/// mismatches (e.g. mixing `conversation_id` and `correlation_id` keys at
74/// different call sites) and makes the key derivation convention explicit at
75/// the type level.
76///
77/// # Named constructors
78///
79/// | Constructor | Use case |
80/// |---|---|
81/// | Constructor | Use case |
82/// |---|---|
83/// | [`from_conversation_and_sender`] | Route inbound APERAK by UTILMD conversation ID + sender GLN |
84/// | [`from_correlation`] | Route follow-up messages by root correlation |
85/// | [`from_process`] | Direct lookup by process instance |
86/// | [`parse`] | Primary validated constructor for runtime-derived keys |
87/// | [`from_static`] | Infallible constructor for compile-time-known string literals |
88///
89/// [`from_conversation_and_sender`]: RegistryKey::from_conversation_and_sender
90/// [`from_correlation`]: RegistryKey::from_correlation
91/// [`from_process`]: RegistryKey::from_process
92/// [`parse`]: RegistryKey::parse
93/// [`from_static`]: RegistryKey::from_static
94#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
95pub struct RegistryKey(Box<str>);
96
97impl RegistryKey {
98 /// Key derived from a conversation ID **and sender GLN**.
99 ///
100 /// This is the correct constructor for UTILMD ↔ APERAK routing in
101 /// multi-market-participant deployments.
102 ///
103 /// # Why sender is required
104 ///
105 /// EDIFACT conversation IDs are assigned by the sender within their own
106 /// numbering space. Two senders may independently assign the same
107 /// conversation-ID string, which would collide if keyed on conversation
108 /// alone. Including the sender GLN as a discriminator makes the key
109 /// globally unique within the tenant's namespace.
110 ///
111 /// # Key format
112 ///
113 /// `"{sender_gln}:{conversation_id}"` — stable, URL-safe, human-readable.
114 #[must_use]
115 pub fn from_conversation_and_sender(id: ConversationId, sender_gln: &str) -> Self {
116 let key = format!("{sender_gln}:{id}");
117 Self(key.into_boxed_str())
118 }
119
120 /// Key derived from a correlation ID (route all messages in the same root trace).
121 #[must_use]
122 pub fn from_correlation(id: CorrelationId) -> Self {
123 Self(id.to_string().into_boxed_str())
124 }
125
126 /// Key derived from a process ID (direct process lookup).
127 #[must_use]
128 pub fn from_process(id: ProcessId) -> Self {
129 Self(id.to_string().into_boxed_str())
130 }
131
132 /// Primary validated constructor for runtime-derived keys.
133 ///
134 /// Returns [`EngineError::Registry`] when `s` contains a NUL byte or
135 /// exceeds [`MAX_REGISTRY_KEY_LEN`] bytes. Use this for all keys derived
136 /// from untrusted input (e.g. EDIFACT `MessageId`, AS4 `conversation_id`,
137 /// or user-supplied strings).
138 ///
139 /// # Errors
140 ///
141 /// - [`EngineError::Registry`] when `s` contains `\0`.
142 /// - [`EngineError::Registry`] when `s.len() > MAX_REGISTRY_KEY_LEN`.
143 pub fn parse(s: &str) -> Result<Self, EngineError> {
144 if s.contains('\0') {
145 return Err(EngineError::registry(
146 "registry key must not contain NUL bytes",
147 ));
148 }
149 if s.len() > MAX_REGISTRY_KEY_LEN {
150 return Err(EngineError::registry(format!(
151 "registry key is {} bytes, exceeds maximum of {MAX_REGISTRY_KEY_LEN}",
152 s.len()
153 )));
154 }
155 Ok(Self(s.into()))
156 }
157
158 /// Infallible constructor for **compile-time-known** string literals.
159 ///
160 /// Panics at runtime if `s` contains a NUL byte or exceeds
161 /// [`MAX_REGISTRY_KEY_LEN`] — but since this is only correct to call with
162 /// string literals, any violation would be caught immediately in tests.
163 ///
164 /// # Panics
165 ///
166 /// Panics when `s` contains a NUL byte or exceeds [`MAX_REGISTRY_KEY_LEN`]
167 /// bytes. **Only use this with string literals.** Use [`parse`] for any
168 /// value that may be runtime-derived.
169 ///
170 /// [`parse`]: RegistryKey::parse
171 #[must_use]
172 pub fn from_static(s: &'static str) -> Self {
173 assert!(
174 !s.contains('\0'),
175 "RegistryKey::from_static: key must not contain NUL bytes"
176 );
177 assert!(
178 s.len() <= MAX_REGISTRY_KEY_LEN,
179 "RegistryKey::from_static: key exceeds MAX_REGISTRY_KEY_LEN ({MAX_REGISTRY_KEY_LEN} bytes)"
180 );
181 Self(s.into())
182 }
183
184 /// The raw key string.
185 #[must_use]
186 pub fn as_str(&self) -> &str {
187 &self.0
188 }
189}
190
191impl fmt::Display for RegistryKey {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 f.write_str(&self.0)
194 }
195}
196
197impl std::str::FromStr for RegistryKey {
198 type Err = EngineError;
199
200 fn from_str(s: &str) -> Result<Self, Self::Err> {
201 RegistryKey::parse(s)
202 }
203}
204
205// ── ProcessRegistry ───────────────────────────────────────────────────────────
206
207/// Routes inbound messages to their target processes by string key.
208///
209/// A `ProcessRegistry` decouples message routing from process creation.
210/// Register a [`ProcessIdentity`] under a stable key at process creation
211/// time, then look it up by that key when routing subsequent inbound messages.
212///
213/// All operations are scoped to a [`TenantId`] so routing entries from
214/// different market participants cannot collide.
215///
216/// ## Correlated-process lookup (1:many)
217///
218/// The standard `register`/`lookup` API maps a key 1:1 to a single process.
219/// For cases where multiple processes share a common business identifier —
220/// for example, all MSCONS measurement-data processes for a single MaLo ID
221/// in MABIS billing aggregation — use the correlated index:
222///
223/// - [`register_correlated`]: associate a `(tenant, tag, process_id)` triple.
224/// - [`lookup_correlated`]: retrieve **all** `ProcessIdentity` values for a tag.
225/// - [`remove_correlated`]: remove a single process from the tag's fan-out set.
226///
227/// The tag is an arbitrary opaque string (e.g. a MaLo ID such as
228/// `"DE0001234567890"`). Key validation rules match [`RegistryKey`].
229///
230/// ```rust,ignore
231/// // Register all MSCONS processes for a Bilanzkreis MaLo:
232/// for process in &mscons_processes {
233/// ctx.registry
234/// .register_correlated(tenant_id, malo_id, process.process_id(), process.identity())
235/// .await?;
236/// }
237///
238/// // Retrieve all processes for billing aggregation:
239/// let identities = ctx.registry
240/// .lookup_correlated(tenant_id, malo_id)
241/// .await?;
242/// ```
243///
244/// ## Blanket `Arc` implementation
245///
246/// `Arc<S>` implements `ProcessRegistry` whenever `S: ProcessRegistry`,
247/// enabling shared access from multiple concurrent message handlers.
248///
249/// [`register_correlated`]: ProcessRegistry::register_correlated
250/// [`lookup_correlated`]: ProcessRegistry::lookup_correlated
251/// [`remove_correlated`]: ProcessRegistry::remove_correlated
252#[allow(async_fn_in_trait)]
253pub trait ProcessRegistry: Send + Sync {
254 /// Associate `key` with `identity` for the given `tenant_id`.
255 ///
256 /// Overwrites any existing mapping for the `(tenant_id, key)` pair
257 /// (upsert semantics).
258 ///
259 /// # Errors
260 ///
261 /// Returns [`EngineError::Registry`] on storage failure.
262 async fn register(
263 &self,
264 tenant_id: TenantId,
265 key: &RegistryKey,
266 identity: ProcessIdentity,
267 ) -> Result<(), EngineError>;
268
269 /// Return the identity associated with `(tenant_id, key)`, or `None`
270 /// if not registered.
271 ///
272 /// # Errors
273 ///
274 /// Returns [`EngineError::Registry`] on storage failure.
275 async fn lookup(
276 &self,
277 tenant_id: TenantId,
278 key: &RegistryKey,
279 ) -> Result<Option<ProcessIdentity>, EngineError>;
280
281 /// Remove the mapping for `(tenant_id, key)`. No-op if not found.
282 ///
283 /// # Errors
284 ///
285 /// Returns [`EngineError::Registry`] on storage failure.
286 async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError>;
287
288 /// Return `true` when `(tenant_id, key)` has a registered mapping.
289 ///
290 /// # Errors
291 ///
292 /// Returns [`EngineError::Registry`] on storage failure.
293 async fn contains(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<bool, EngineError> {
294 Ok(self.lookup(tenant_id, key).await?.is_some())
295 }
296
297 /// Total number of registered routing keys across all tenants.
298 ///
299 /// # Errors
300 ///
301 /// Returns [`EngineError::Registry`] on storage failure.
302 async fn len(&self) -> Result<usize, EngineError>;
303
304 /// Return `true` when no routing keys are registered.
305 ///
306 /// # Errors
307 ///
308 /// Returns [`EngineError::Registry`] on storage failure.
309 async fn is_empty(&self) -> Result<bool, EngineError> {
310 Ok(self.len().await? == 0)
311 }
312
313 // ── Correlated (1:many) index ────────────────────────────────────────────
314
315 /// Associate `process_id`/`identity` with the correlation `tag` for the
316 /// given `tenant_id`.
317 ///
318 /// Multiple processes can be registered under the same `(tenant_id, tag)`,
319 /// making `lookup_correlated` return all of them. This is the fan-out
320 /// counterpart to the 1:1 `register`/`lookup` API.
321 ///
322 /// # Tag constraints
323 ///
324 /// Same validation as [`RegistryKey`]: must not contain `\0`, must be
325 /// ≤ [`MAX_REGISTRY_KEY_LEN`] bytes.
326 ///
327 /// # Errors
328 ///
329 /// Returns [`EngineError::Registry`] on storage failure or invalid tag.
330 async fn register_correlated(
331 &self,
332 tenant_id: TenantId,
333 tag: &str,
334 process_id: crate::ids::ProcessId,
335 identity: ProcessIdentity,
336 ) -> Result<(), EngineError>;
337
338 /// Return all `ProcessIdentity` values registered under `(tenant_id, tag)`.
339 ///
340 /// Returns an empty `Vec` when no entries exist for the tag.
341 ///
342 /// # Errors
343 ///
344 /// Returns [`EngineError::Registry`] on storage failure.
345 async fn lookup_correlated(
346 &self,
347 tenant_id: TenantId,
348 tag: &str,
349 ) -> Result<Vec<ProcessIdentity>, EngineError>;
350
351 /// Remove the `process_id` entry from the `(tenant_id, tag)` fan-out set.
352 ///
353 /// No-op when the entry does not exist.
354 ///
355 /// # Errors
356 ///
357 /// Returns [`EngineError::Registry`] on storage failure.
358 async fn remove_correlated(
359 &self,
360 tenant_id: TenantId,
361 tag: &str,
362 process_id: crate::ids::ProcessId,
363 ) -> Result<(), EngineError>;
364}
365
366// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
367
368impl<S: ProcessRegistry> ProcessRegistry for Arc<S> {
369 async fn register(
370 &self,
371 tenant_id: TenantId,
372 key: &RegistryKey,
373 identity: ProcessIdentity,
374 ) -> Result<(), EngineError> {
375 self.as_ref().register(tenant_id, key, identity).await
376 }
377
378 async fn lookup(
379 &self,
380 tenant_id: TenantId,
381 key: &RegistryKey,
382 ) -> Result<Option<ProcessIdentity>, EngineError> {
383 self.as_ref().lookup(tenant_id, key).await
384 }
385
386 async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
387 self.as_ref().remove(tenant_id, key).await
388 }
389
390 async fn len(&self) -> Result<usize, EngineError> {
391 self.as_ref().len().await
392 }
393
394 async fn register_correlated(
395 &self,
396 tenant_id: TenantId,
397 tag: &str,
398 process_id: crate::ids::ProcessId,
399 identity: ProcessIdentity,
400 ) -> Result<(), EngineError> {
401 self.as_ref()
402 .register_correlated(tenant_id, tag, process_id, identity)
403 .await
404 }
405
406 async fn lookup_correlated(
407 &self,
408 tenant_id: TenantId,
409 tag: &str,
410 ) -> Result<Vec<ProcessIdentity>, EngineError> {
411 self.as_ref().lookup_correlated(tenant_id, tag).await
412 }
413
414 async fn remove_correlated(
415 &self,
416 tenant_id: TenantId,
417 tag: &str,
418 process_id: crate::ids::ProcessId,
419 ) -> Result<(), EngineError> {
420 self.as_ref()
421 .remove_correlated(tenant_id, tag, process_id)
422 .await
423 }
424}
425
426// ── NoopProcessRegistry ───────────────────────────────────────────────────────
427
428/// A [`ProcessRegistry`] that never stores any mappings.
429///
430/// Every `lookup` returns `None`. Use this as the default when routing is
431/// managed externally or not required.
432///
433/// # ⚠️ Routing loss warning
434///
435/// `NoopProcessRegistry` **discards every routing registration silently**.
436/// All `lookup` calls return `None`. Inbound messages for existing processes
437/// will not be routed. Do not use in production when message routing is needed.
438///
439/// This type is available in all build configurations so it can serve as a
440/// default type parameter in [`EngineBuilder`]. However, [`EngineBuilder::new`]
441/// (which wires this as the default) is only available with the `testing`
442/// feature or in `cfg(test)`. Production binaries must call
443/// [`EngineBuilder::with_stores`] instead.
444///
445/// [`EngineBuilder`]: crate::builder::EngineBuilder
446/// [`EngineBuilder::new`]: crate::builder::EngineBuilder::new
447/// [`EngineBuilder::with_stores`]: crate::builder::EngineBuilder::with_stores
448#[derive(Debug, Clone, Copy, Default)]
449#[must_use = "NoopProcessRegistry discards all routing registrations silently — use a persistent ProcessRegistry in production"]
450pub struct NoopProcessRegistry;
451
452#[cfg(any(test, feature = "testing"))]
453impl ProcessRegistry for NoopProcessRegistry {
454 async fn register(
455 &self,
456 _tenant_id: TenantId,
457 _key: &RegistryKey,
458 _identity: ProcessIdentity,
459 ) -> Result<(), EngineError> {
460 Ok(())
461 }
462
463 async fn lookup(
464 &self,
465 _tenant_id: TenantId,
466 _key: &RegistryKey,
467 ) -> Result<Option<ProcessIdentity>, EngineError> {
468 Ok(None)
469 }
470
471 async fn remove(&self, _tenant_id: TenantId, _key: &RegistryKey) -> Result<(), EngineError> {
472 Ok(())
473 }
474
475 async fn len(&self) -> Result<usize, EngineError> {
476 Ok(0)
477 }
478
479 async fn register_correlated(
480 &self,
481 _tenant_id: TenantId,
482 _tag: &str,
483 _process_id: crate::ids::ProcessId,
484 _identity: ProcessIdentity,
485 ) -> Result<(), EngineError> {
486 Ok(())
487 }
488
489 async fn lookup_correlated(
490 &self,
491 _tenant_id: TenantId,
492 _tag: &str,
493 ) -> Result<Vec<ProcessIdentity>, EngineError> {
494 Ok(vec![])
495 }
496
497 async fn remove_correlated(
498 &self,
499 _tenant_id: TenantId,
500 _tag: &str,
501 _process_id: crate::ids::ProcessId,
502 ) -> Result<(), EngineError> {
503 Ok(())
504 }
505}
506
507// ── InMemoryProcessRegistry ───────────────────────────────────────────────────
508
509/// An in-memory [`ProcessRegistry`] for tests and development.
510///
511/// Backed by a `HashMap<(TenantId, String), ProcessIdentity>` protected by a
512/// `RwLock`. Cloning shares the underlying data via `Arc` — all clones see the
513/// same mappings.
514///
515/// Use this with [`EngineContext`] to verify message routing without
516/// depending on an external registry service.
517///
518/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
519///
520/// [`EngineContext`]: crate::builder::EngineContext
521#[cfg(any(test, feature = "testing"))]
522#[derive(Debug, Default, Clone)]
523pub struct InMemoryProcessRegistry {
524 #[allow(clippy::type_complexity)]
525 inner: Arc<RwLock<HashMap<(TenantId, Box<str>), ProcessIdentity>>>,
526 /// Correlated 1:many index: `(tenant_id, tag, process_id)` → `ProcessIdentity`.
527 #[allow(clippy::type_complexity)]
528 correlated: Arc<RwLock<HashMap<(TenantId, Box<str>, crate::ids::ProcessId), ProcessIdentity>>>,
529}
530
531#[cfg(any(test, feature = "testing"))]
532impl InMemoryProcessRegistry {
533 /// Create an empty registry.
534 #[must_use]
535 pub fn new() -> Self {
536 Self::default()
537 }
538
539 /// Return `true` when no routing keys are registered.
540 pub async fn is_empty_async(&self) -> bool {
541 self.inner.read().await.is_empty()
542 }
543}
544
545#[cfg(any(test, feature = "testing"))]
546impl ProcessRegistry for InMemoryProcessRegistry {
547 async fn register(
548 &self,
549 tenant_id: TenantId,
550 key: &RegistryKey,
551 identity: ProcessIdentity,
552 ) -> Result<(), EngineError> {
553 self.inner
554 .write()
555 .await
556 .insert((tenant_id, key.0.clone()), identity);
557 Ok(())
558 }
559
560 async fn lookup(
561 &self,
562 tenant_id: TenantId,
563 key: &RegistryKey,
564 ) -> Result<Option<ProcessIdentity>, EngineError> {
565 Ok(self
566 .inner
567 .read()
568 .await
569 .get(&(tenant_id, key.0.clone()))
570 .cloned())
571 }
572
573 async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
574 self.inner.write().await.remove(&(tenant_id, key.0.clone()));
575 Ok(())
576 }
577
578 async fn len(&self) -> Result<usize, EngineError> {
579 Ok(self.inner.read().await.len())
580 }
581
582 async fn register_correlated(
583 &self,
584 tenant_id: TenantId,
585 tag: &str,
586 process_id: crate::ids::ProcessId,
587 identity: ProcessIdentity,
588 ) -> Result<(), EngineError> {
589 self.correlated
590 .write()
591 .await
592 .insert((tenant_id, tag.into(), process_id), identity);
593 Ok(())
594 }
595
596 async fn lookup_correlated(
597 &self,
598 tenant_id: TenantId,
599 tag: &str,
600 ) -> Result<Vec<ProcessIdentity>, EngineError> {
601 let guard = self.correlated.read().await;
602 let result = guard
603 .iter()
604 .filter(|((tid, t, _), _)| *tid == tenant_id && t.as_ref() == tag)
605 .map(|(_, identity)| identity.clone())
606 .collect();
607 Ok(result)
608 }
609
610 async fn remove_correlated(
611 &self,
612 tenant_id: TenantId,
613 tag: &str,
614 process_id: crate::ids::ProcessId,
615 ) -> Result<(), EngineError> {
616 self.correlated
617 .write()
618 .await
619 .remove(&(tenant_id, tag.into(), process_id));
620 Ok(())
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use crate::{
628 ids::{ProcessId, TenantId},
629 version::WorkflowId,
630 };
631
632 fn make_identity() -> ProcessIdentity {
633 let pid = ProcessId::new();
634 ProcessIdentity::new(
635 pid,
636 TenantId::new(),
637 WorkflowId::new("test", "FV2024-10-01"),
638 )
639 }
640
641 fn tid() -> TenantId {
642 TenantId::new()
643 }
644
645 fn key(s: &str) -> RegistryKey {
646 RegistryKey::parse(s).expect("valid test key")
647 }
648
649 #[tokio::test]
650 async fn register_and_lookup() {
651 let reg = InMemoryProcessRegistry::new();
652 let tenant = tid();
653 let id = make_identity();
654 reg.register(tenant, &key("conv:abc"), id.clone())
655 .await
656 .unwrap();
657 let found = reg
658 .lookup(tenant, &key("conv:abc"))
659 .await
660 .unwrap()
661 .expect("must be found");
662 assert_eq!(found.process_id, id.process_id);
663 }
664
665 #[tokio::test]
666 async fn lookup_returns_none_for_unknown_key() {
667 let reg = InMemoryProcessRegistry::new();
668 assert!(reg.lookup(tid(), &key("unknown")).await.unwrap().is_none());
669 }
670
671 #[tokio::test]
672 async fn remove_clears_mapping() {
673 let reg = InMemoryProcessRegistry::new();
674 let tenant = tid();
675 let id = make_identity();
676 reg.register(tenant, &key("k1"), id).await.unwrap();
677 reg.remove(tenant, &key("k1")).await.unwrap();
678 assert!(reg.lookup(tenant, &key("k1")).await.unwrap().is_none());
679 }
680
681 #[tokio::test]
682 async fn upsert_overwrites_existing() {
683 let reg = InMemoryProcessRegistry::new();
684 let tenant = tid();
685 let id1 = make_identity();
686 let id2 = make_identity();
687 reg.register(tenant, &key("k1"), id1).await.unwrap();
688 reg.register(tenant, &key("k1"), id2.clone()).await.unwrap();
689 let found = reg.lookup(tenant, &key("k1")).await.unwrap().unwrap();
690 assert_eq!(found.process_id, id2.process_id);
691 assert_eq!(
692 reg.len().await.unwrap(),
693 1,
694 "upsert must not duplicate the key"
695 );
696 }
697
698 #[tokio::test]
699 async fn contains_matches_register() {
700 let reg = InMemoryProcessRegistry::new();
701 let tenant = tid();
702 assert!(!reg.contains(tenant, &key("k1")).await.unwrap());
703 reg.register(tenant, &key("k1"), make_identity())
704 .await
705 .unwrap();
706 assert!(reg.contains(tenant, &key("k1")).await.unwrap());
707 }
708
709 #[tokio::test]
710 async fn clone_shares_state() {
711 let reg1 = InMemoryProcessRegistry::new();
712 let reg2 = reg1.clone();
713 let tenant = tid();
714 reg1.register(tenant, &key("k1"), make_identity())
715 .await
716 .unwrap();
717 assert!(reg2.contains(tenant, &key("k1")).await.unwrap());
718 }
719
720 /// `from_conversation_and_sender` key contains sender prefix.
721 #[test]
722 fn from_conversation_and_sender_key_contains_sender() {
723 use crate::ids::ConversationId;
724 let conv = ConversationId::new();
725 let k = RegistryKey::from_conversation_and_sender(conv, "4012345000023");
726 assert!(k.as_str().starts_with("4012345000023:"));
727 assert!(k.as_str().ends_with(&conv.to_string()));
728 }
729
730 #[tokio::test]
731 async fn tenant_keys_are_isolated() {
732 let reg = InMemoryProcessRegistry::new();
733 let t1 = tid();
734 let t2 = tid();
735 reg.register(t1, &key("k1"), make_identity()).await.unwrap();
736 assert!(
737 reg.contains(t1, &key("k1")).await.unwrap(),
738 "tenant1 must see key"
739 );
740 assert!(
741 !reg.contains(t2, &key("k1")).await.unwrap(),
742 "tenant2 must not see key"
743 );
744 }
745}