Skip to main content

ProcessRegistry

Trait ProcessRegistry 

Source
pub trait ProcessRegistry: Send + Sync {
    // Required methods
    async fn register(
        &self,
        tenant_id: TenantId,
        key: &RegistryKey,
        identity: ProcessIdentity,
    ) -> Result<(), EngineError>;
    async fn lookup(
        &self,
        tenant_id: TenantId,
        key: &RegistryKey,
    ) -> Result<Option<ProcessIdentity>, EngineError>;
    async fn remove(
        &self,
        tenant_id: TenantId,
        key: &RegistryKey,
    ) -> Result<(), EngineError>;
    async fn len(&self) -> Result<usize, EngineError>;
    async fn register_correlated(
        &self,
        tenant_id: TenantId,
        tag: &str,
        process_id: ProcessId,
        identity: ProcessIdentity,
    ) -> Result<(), EngineError>;
    async fn lookup_correlated(
        &self,
        tenant_id: TenantId,
        tag: &str,
    ) -> Result<Vec<ProcessIdentity>, EngineError>;
    async fn remove_correlated(
        &self,
        tenant_id: TenantId,
        tag: &str,
        process_id: ProcessId,
    ) -> Result<(), EngineError>;

    // Provided methods
    async fn contains(
        &self,
        tenant_id: TenantId,
        key: &RegistryKey,
    ) -> Result<bool, EngineError> { ... }
    async fn is_empty(&self) -> Result<bool, EngineError> { ... }
}
Expand description

Routes inbound messages to their target processes by string key.

A ProcessRegistry decouples message routing from process creation. Register a ProcessIdentity under a stable key at process creation time, then look it up by that key when routing subsequent inbound messages.

All operations are scoped to a TenantId so routing entries from different market participants cannot collide.

§Correlated-process lookup (1:many)

The standard register/lookup API maps a key 1:1 to a single process. For cases where multiple processes share a common business identifier — for example, all MSCONS measurement-data processes for a single MaLo ID in MABIS billing aggregation — use the correlated index:

The tag is an arbitrary opaque string (e.g. a MaLo ID such as "DE0001234567890"). Key validation rules match RegistryKey.

// Register all MSCONS processes for a Bilanzkreis MaLo:
for process in &mscons_processes {
    ctx.registry
        .register_correlated(tenant_id, malo_id, process.process_id(), process.identity())
        .await?;
}

// Retrieve all processes for billing aggregation:
let identities = ctx.registry
    .lookup_correlated(tenant_id, malo_id)
    .await?;

§Blanket Arc implementation

Arc<S> implements ProcessRegistry whenever S: ProcessRegistry, enabling shared access from multiple concurrent message handlers.

Required Methods§

Source

async fn register( &self, tenant_id: TenantId, key: &RegistryKey, identity: ProcessIdentity, ) -> Result<(), EngineError>

Associate key with identity for the given tenant_id.

Overwrites any existing mapping for the (tenant_id, key) pair (upsert semantics).

§Errors

Returns EngineError::Registry on storage failure.

Source

async fn lookup( &self, tenant_id: TenantId, key: &RegistryKey, ) -> Result<Option<ProcessIdentity>, EngineError>

Return the identity associated with (tenant_id, key), or None if not registered.

§Errors

Returns EngineError::Registry on storage failure.

Source

async fn remove( &self, tenant_id: TenantId, key: &RegistryKey, ) -> Result<(), EngineError>

Remove the mapping for (tenant_id, key). No-op if not found.

§Errors

Returns EngineError::Registry on storage failure.

Source

async fn len(&self) -> Result<usize, EngineError>

Total number of registered routing keys across all tenants.

§Errors

Returns EngineError::Registry on storage failure.

Source

async fn register_correlated( &self, tenant_id: TenantId, tag: &str, process_id: ProcessId, identity: ProcessIdentity, ) -> Result<(), EngineError>

Associate process_id/identity with the correlation tag for the given tenant_id.

Multiple processes can be registered under the same (tenant_id, tag), making lookup_correlated return all of them. This is the fan-out counterpart to the 1:1 register/lookup API.

§Tag constraints

Same validation as RegistryKey: must not contain \0, must be ≤ MAX_REGISTRY_KEY_LEN bytes.

§Errors

Returns EngineError::Registry on storage failure or invalid tag.

Source

async fn lookup_correlated( &self, tenant_id: TenantId, tag: &str, ) -> Result<Vec<ProcessIdentity>, EngineError>

Return all ProcessIdentity values registered under (tenant_id, tag).

Returns an empty Vec when no entries exist for the tag.

§Errors

Returns EngineError::Registry on storage failure.

Source

async fn remove_correlated( &self, tenant_id: TenantId, tag: &str, process_id: ProcessId, ) -> Result<(), EngineError>

Remove the process_id entry from the (tenant_id, tag) fan-out set.

No-op when the entry does not exist.

§Errors

Returns EngineError::Registry on storage failure.

Provided Methods§

Source

async fn contains( &self, tenant_id: TenantId, key: &RegistryKey, ) -> Result<bool, EngineError>

Return true when (tenant_id, key) has a registered mapping.

§Errors

Returns EngineError::Registry on storage failure.

Source

async fn is_empty(&self) -> Result<bool, EngineError>

Return true when no routing keys are registered.

§Errors

Returns EngineError::Registry on storage failure.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementations on Foreign Types§

Source§

impl<S: ProcessRegistry> ProcessRegistry for Arc<S>

Source§

async fn register( &self, tenant_id: TenantId, key: &RegistryKey, identity: ProcessIdentity, ) -> Result<(), EngineError>

Source§

async fn lookup( &self, tenant_id: TenantId, key: &RegistryKey, ) -> Result<Option<ProcessIdentity>, EngineError>

Source§

async fn remove( &self, tenant_id: TenantId, key: &RegistryKey, ) -> Result<(), EngineError>

Source§

async fn len(&self) -> Result<usize, EngineError>

Source§

async fn register_correlated( &self, tenant_id: TenantId, tag: &str, process_id: ProcessId, identity: ProcessIdentity, ) -> Result<(), EngineError>

Source§

async fn lookup_correlated( &self, tenant_id: TenantId, tag: &str, ) -> Result<Vec<ProcessIdentity>, EngineError>

Source§

async fn remove_correlated( &self, tenant_id: TenantId, tag: &str, process_id: ProcessId, ) -> Result<(), EngineError>

Implementors§