#![allow(clippy::type_complexity)]
use crate::{
dead_letter::{DeadLetterSink, LogDeadLetterSink},
deadline::{Deadline, DeadlineStore, NoopDeadlineStore},
error::EngineError,
event_store::EventStore,
ids::{ProcessIdentity, TenantId},
marktrolle::DeploymentRoles,
outbox::{NoopOutboxStore, OutboxMessage, OutboxStore},
pid_router::PidRouter,
process::Process,
registry::{NoopProcessRegistry, ProcessRegistry},
snapshot::{NoopSnapshotStore, SnapshotStore},
version::WorkflowId,
workflow::Workflow,
};
use std::sync::Arc;
pub trait EngineModule: Send + 'static {
fn name(&self) -> &'static str;
fn register_pids(&self, _router: &mut PidRouter) {}
fn register_pids_with_roles(&self, router: &mut PidRouter, _roles: &DeploymentRoles) {
self.register_pids(router);
}
fn workflow_names(&self) -> &'static [&'static str] {
&[]
}
fn profile_requirements(&self) -> &'static [crate::profile::ProfileRequirement] {
&[]
}
fn configure(&self) -> Result<(), String> {
Ok(())
}
}
pub struct EngineContext<
ES,
SS = NoopSnapshotStore,
OS = NoopOutboxStore,
DS = NoopDeadlineStore,
PR = NoopProcessRegistry,
> {
event_store: Arc<ES>,
snapshot_store: SS,
outbox_store: OS,
deadline_store: DS,
registry: PR,
pub dead_letter_sink: Arc<dyn DeadLetterSink>,
pid_router: PidRouter,
registered_modules: Vec<&'static str>,
registered_workflows: Vec<&'static str>,
}
pub type MinimalEngine<ES> = EngineContext<ES>;
impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
where
ES: EventStore,
{
#[must_use]
pub fn spawn<W: Workflow>(
&self,
tenant_id: TenantId,
workflow_id: WorkflowId,
) -> Process<W, Arc<ES>> {
Process::new(Arc::clone(&self.event_store), tenant_id, workflow_id)
}
#[must_use]
pub fn resume<W: Workflow>(&self, identity: ProcessIdentity) -> Process<W, Arc<ES>> {
Process::from_identity(Arc::clone(&self.event_store), identity)
}
#[must_use]
pub fn registered_modules(&self) -> &[&'static str] {
&self.registered_modules
}
#[must_use]
pub fn registered_workflows(&self) -> &[&'static str] {
&self.registered_workflows
}
#[must_use]
pub fn event_store(&self) -> &Arc<ES> {
&self.event_store
}
#[must_use]
pub fn snapshot_store(&self) -> &SS {
&self.snapshot_store
}
#[must_use]
pub fn outbox_store(&self) -> &OS {
&self.outbox_store
}
#[must_use]
pub fn deadline_store(&self) -> &DS {
&self.deadline_store
}
#[must_use]
pub fn registry(&self) -> &PR {
&self.registry
}
#[must_use]
pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink> {
&self.dead_letter_sink
}
pub fn assert_production_stores(&self) {
let checks: &[(&str, &str)] = &[
("OutboxStore", std::any::type_name::<OS>()),
("DeadlineStore", std::any::type_name::<DS>()),
("ProcessRegistry", std::any::type_name::<PR>()),
];
for (trait_name, type_name) in checks {
assert!(
!type_name.contains("Noop"),
"makod: Noop{trait_name} is active — \
configure a persistent {trait_name} backend in makod.toml. \
Type resolved to: {type_name}"
);
}
}
#[must_use]
pub fn pid_router(&self) -> &PidRouter {
&self.pid_router
}
}
pub trait As4Sender: Send + Sync + 'static {
fn send(
&self,
msg: &OutboxMessage,
) -> impl std::future::Future<Output = Result<(), EngineError>> + Send;
}
pub struct OutboxWorker<OS: OutboxStore, S: As4Sender> {
store: OS,
sender: S,
batch_size: usize,
poll_interval: std::time::Duration,
max_attempts: u32,
dead_letter_sink: std::sync::Arc<dyn crate::dead_letter::DeadLetterSink>,
}
fn backoff_delay(attempt: u32, entropy: u64) -> std::time::Duration {
const BASE_SECS: u64 = 5;
const MAX_SECS: u64 = 300;
let window = BASE_SECS
.saturating_mul(1u64.wrapping_shl(attempt.min(5)))
.min(MAX_SECS);
let jitter_secs = if window == 0 { 0 } else { entropy % window };
std::time::Duration::from_secs(jitter_secs)
}
impl<OS: OutboxStore, S: As4Sender> OutboxWorker<OS, S> {
#[allow(clippy::too_many_lines)]
pub async fn run(self) {
loop {
let batch = match self.store.pending_now(self.batch_size).await {
Ok(b) => b,
Err(e) => {
tracing::warn!(error = %e, "outbox worker: store error polling pending messages (will retry)");
tokio::time::sleep(self.poll_interval).await;
continue;
}
};
if batch.is_empty() {
tokio::time::sleep(self.poll_interval).await;
continue;
}
for msg in batch {
if msg.attempt_count >= self.max_attempts {
tracing::error!(
message_id = %msg.message_id,
message_type = %msg.message_type,
recipient = %msg.recipient,
attempts = msg.attempt_count,
max_attempts = self.max_attempts,
"outbox worker: max delivery attempts reached; dead-lettering message",
);
self.dead_letter_sink.reject(
&crate::dead_letter::DeadLetterReason::OutboxExhausted {
message_id: msg.message_id,
message_type: msg.message_type.to_string(),
recipient: msg.recipient.to_string(),
last_error: format!(
"delivery exhausted after {} attempts",
msg.attempt_count
),
attempts: msg.attempt_count,
},
);
if let Err(e) = self.store.acknowledge(msg.message_id).await {
tracing::error!(
message_id = %msg.message_id,
error = %e,
"outbox worker: acknowledge after exhaust failed; message may reappear",
);
}
continue;
}
match self.sender.send(&msg).await {
Ok(()) => {
if let Err(e) = self.store.acknowledge(msg.message_id).await {
tracing::warn!(
message_id = %msg.message_id,
error = %e,
"outbox worker: acknowledge failed",
);
}
if msg.message_type.as_ref() == "CONTRL" {
let elapsed = time::OffsetDateTime::now_utc() - msg.created_at;
if elapsed > time::Duration::hours(crate::fristen::CONTRL_FRIST_HOURS) {
tracing::warn!(
message_id = %msg.message_id,
elapsed_secs = elapsed.whole_seconds(),
max_secs = crate::fristen::CONTRL_FRIST_HOURS * 3600,
"outbox worker: CONTRL delivered OUTSIDE the 6h Übertragungsfrist \
(CONTRL AHB 1.0 §1.2) — this is a BNetzA compliance violation"
);
}
}
}
Err(ref e)
if e.is_partner_unknown() || matches!(e, EngineError::Serialization(_)) =>
{
tracing::error!(
message_id = %msg.message_id,
message_type = %msg.message_type,
recipient = %msg.recipient,
error = %e,
"outbox worker: permanent send failure; dead-lettering without retry",
);
self.dead_letter_sink.reject(
&crate::dead_letter::DeadLetterReason::OutboxExhausted {
message_id: msg.message_id,
message_type: msg.message_type.to_string(),
recipient: msg.recipient.to_string(),
last_error: e.to_string(),
attempts: msg.attempt_count,
},
);
if let Err(re) = self.store.acknowledge(msg.message_id).await {
tracing::error!(
message_id = %msg.message_id,
error = %re,
"outbox worker: acknowledge after permanent failure failed",
);
}
}
Err(e) => {
let entropy = {
let uuid = msg.message_id.as_uuid();
let bytes = uuid.as_bytes();
u64::from_le_bytes(bytes[8..16].try_into().unwrap())
};
let delay = backoff_delay(msg.attempt_count, entropy);
let retry_at = time::OffsetDateTime::now_utc()
+ time::Duration::try_from(delay).unwrap_or(time::Duration::minutes(5));
tracing::warn!(
message_id = %msg.message_id,
attempt = msg.attempt_count,
max_attempts = self.max_attempts,
retry_in = ?delay,
error = %e,
"outbox worker: send failed; rescheduling with backoff",
);
if let Err(re) = self.store.reschedule(msg.message_id, retry_at).await {
tracing::error!(
message_id = %msg.message_id,
error = %re,
"outbox worker: reschedule failed; message may be stuck",
);
}
}
}
}
}
}
}
impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
where
ES: EventStore,
OS: OutboxStore + Clone,
{
#[must_use]
pub fn run_outbox_worker<S: As4Sender>(
&self,
sender: S,
batch_size: usize,
poll_interval: std::time::Duration,
max_attempts: u32,
) -> OutboxWorker<OS, S> {
OutboxWorker {
store: self.outbox_store.clone(),
sender,
batch_size,
poll_interval,
max_attempts,
dead_letter_sink: self.dead_letter_sink.clone(),
}
}
}
impl<ES, SS, OS, DS, PR> std::fmt::Debug for EngineContext<ES, SS, OS, DS, PR>
where
ES: std::fmt::Debug,
SS: std::fmt::Debug,
OS: std::fmt::Debug,
DS: std::fmt::Debug,
PR: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineContext")
.field("registered_modules", &self.registered_modules)
.field("registered_workflows", &self.registered_workflows)
.field("pid_router_len", &self.pid_router.len())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, Default)]
#[must_use = "NoopAs4Sender discards all outbound messages silently — use a real AS4 gateway in production"]
pub struct NoopAs4Sender;
impl As4Sender for NoopAs4Sender {
async fn send(&self, _msg: &OutboxMessage) -> Result<(), EngineError> {
Ok(())
}
}
#[derive(Debug, Clone, Copy, Default)]
#[must_use = "LogAs4Sender discards all outbound messages — use a real AS4 gateway in production"]
pub struct LogAs4Sender;
impl As4Sender for LogAs4Sender {
async fn send(&self, msg: &OutboxMessage) -> Result<(), EngineError> {
tracing::warn!(
message_id = %msg.message_id,
message_type = %msg.message_type,
recipient = %msg.recipient,
"LogAs4Sender: outbox message dropped — configure a real AS4 gateway for production",
);
Ok(())
}
}
pub struct DeadlineScheduler<DS: DeadlineStore> {
store: DS,
dispatch: Box<
dyn Fn(
Deadline,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), EngineError>> + Send>,
> + Send
+ Sync,
>,
batch_size: usize,
poll_interval: std::time::Duration,
}
impl<DS: DeadlineStore> DeadlineScheduler<DS> {
pub async fn run(self) {
loop {
let result = match self.store.due_now(self.batch_size).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(
error = %e,
"deadline scheduler: store error polling due deadlines (will retry)",
);
tokio::time::sleep(self.poll_interval).await;
continue;
}
};
if result.deadlines.is_empty() {
tokio::time::sleep(self.poll_interval).await;
continue;
}
for deadline in result.deadlines {
let id = deadline.deadline_id();
let label = deadline.label().to_owned();
let should_cancel = match (self.dispatch)(deadline).await {
Ok(()) => true,
Err(ref e) if e.is_version_conflict() => {
tracing::warn!(
deadline_id = %id,
label = %label,
"deadline scheduler: VersionConflict; will retry on next poll",
);
false
}
Err(e) => {
tracing::warn!(
deadline_id = %id,
label = %label,
error = %e,
"deadline scheduler: dispatch failed (permanent); cancelling",
);
true
}
};
if should_cancel && let Err(e) = self.store.cancel(id).await {
tracing::error!(
deadline_id = %id,
error = %e,
"deadline scheduler: cancel failed; deadline may fire again",
);
}
}
}
}
}
impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
where
ES: EventStore,
DS: DeadlineStore + Clone,
{
#[must_use]
pub fn run_deadline_scheduler<F, Fut>(
&self,
dispatch: F,
batch_size: usize,
poll_interval: std::time::Duration,
) -> DeadlineScheduler<DS>
where
F: Fn(Deadline) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<(), EngineError>> + Send + 'static,
{
DeadlineScheduler {
store: self.deadline_store.clone(),
dispatch: Box::new(move |d| Box::pin(dispatch(d))),
batch_size,
poll_interval,
}
}
}
pub struct EngineBuilder<
ES = (),
SS = NoopSnapshotStore,
OS = NoopOutboxStore,
DS = NoopDeadlineStore,
PR = NoopProcessRegistry,
> {
event_store: ES,
snapshot_store: SS,
outbox_store: OS,
deadline_store: DS,
registry: PR,
dead_letter_sink: Arc<dyn DeadLetterSink>,
modules: Vec<Box<dyn EngineModule>>,
deployment_roles: DeploymentRoles,
profile_validator: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
}
#[cfg(any(test, feature = "testing"))]
impl Default
for EngineBuilder<
(),
NoopSnapshotStore,
NoopOutboxStore,
NoopDeadlineStore,
NoopProcessRegistry,
>
{
fn default() -> Self {
Self {
event_store: (),
snapshot_store: NoopSnapshotStore,
outbox_store: NoopOutboxStore,
deadline_store: NoopDeadlineStore,
registry: NoopProcessRegistry,
dead_letter_sink: Arc::new(LogDeadLetterSink),
modules: Vec::new(),
deployment_roles: DeploymentRoles::all(),
profile_validator: None,
}
}
}
#[cfg(any(test, feature = "testing"))]
impl EngineBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl<OS, DS, PR> EngineBuilder<(), NoopSnapshotStore, OS, DS, PR>
where
OS: OutboxStore,
DS: DeadlineStore,
PR: ProcessRegistry,
{
#[must_use]
pub fn with_stores(outbox_store: OS, deadline_store: DS, registry: PR) -> Self {
Self {
event_store: (),
snapshot_store: NoopSnapshotStore,
outbox_store,
deadline_store,
registry,
dead_letter_sink: Arc::new(LogDeadLetterSink),
modules: Vec::new(),
deployment_roles: DeploymentRoles::all(),
profile_validator: None,
}
}
}
impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR> {
#[must_use]
pub fn with_event_store<ES2: EventStore>(
self,
store: ES2,
) -> EngineBuilder<ES2, SS, OS, DS, PR> {
EngineBuilder {
event_store: store,
snapshot_store: self.snapshot_store,
outbox_store: self.outbox_store,
deadline_store: self.deadline_store,
registry: self.registry,
dead_letter_sink: self.dead_letter_sink,
modules: self.modules,
deployment_roles: self.deployment_roles,
profile_validator: self.profile_validator,
}
}
#[must_use]
pub fn with_snapshot_store<SS2: SnapshotStore>(
self,
store: SS2,
) -> EngineBuilder<ES, SS2, OS, DS, PR> {
EngineBuilder {
event_store: self.event_store,
snapshot_store: store,
outbox_store: self.outbox_store,
deadline_store: self.deadline_store,
registry: self.registry,
dead_letter_sink: self.dead_letter_sink,
modules: self.modules,
deployment_roles: self.deployment_roles,
profile_validator: self.profile_validator,
}
}
#[must_use]
pub fn with_outbox_store<OS2: OutboxStore>(
self,
store: OS2,
) -> EngineBuilder<ES, SS, OS2, DS, PR> {
EngineBuilder {
event_store: self.event_store,
snapshot_store: self.snapshot_store,
outbox_store: store,
deadline_store: self.deadline_store,
registry: self.registry,
dead_letter_sink: self.dead_letter_sink,
modules: self.modules,
deployment_roles: self.deployment_roles,
profile_validator: self.profile_validator,
}
}
#[must_use]
pub fn with_deadline_store<DS2: DeadlineStore>(
self,
store: DS2,
) -> EngineBuilder<ES, SS, OS, DS2, PR> {
EngineBuilder {
event_store: self.event_store,
snapshot_store: self.snapshot_store,
outbox_store: self.outbox_store,
deadline_store: store,
registry: self.registry,
dead_letter_sink: self.dead_letter_sink,
modules: self.modules,
deployment_roles: self.deployment_roles,
profile_validator: self.profile_validator,
}
}
#[must_use]
pub fn with_registry<PR2: ProcessRegistry>(
self,
registry: PR2,
) -> EngineBuilder<ES, SS, OS, DS, PR2> {
EngineBuilder {
event_store: self.event_store,
snapshot_store: self.snapshot_store,
outbox_store: self.outbox_store,
deadline_store: self.deadline_store,
registry,
dead_letter_sink: self.dead_letter_sink,
modules: self.modules,
deployment_roles: self.deployment_roles,
profile_validator: self.profile_validator,
}
}
#[must_use]
pub fn with_dead_letter_sink(mut self, sink: impl DeadLetterSink) -> Self {
self.dead_letter_sink = Arc::new(sink);
self
}
#[must_use]
pub fn with_profile_validator(
mut self,
validator: impl Fn(&str) -> bool + Send + Sync + 'static,
) -> Self {
self.profile_validator = Some(Box::new(validator));
self
}
#[must_use]
pub fn register(mut self, module: Box<dyn EngineModule>) -> Self {
self.modules.push(module);
self
}
#[must_use]
pub fn with_deployment_roles(mut self, roles: DeploymentRoles) -> Self {
self.deployment_roles = roles;
self
}
}
impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR>
where
ES: EventStore,
SS: SnapshotStore,
OS: OutboxStore,
DS: DeadlineStore,
PR: ProcessRegistry,
{
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn build(self) -> EngineContext<ES, SS, OS, DS, PR> {
{
let os_name = std::any::type_name::<OS>();
let ds_name = std::any::type_name::<DS>();
let pr_name = std::any::type_name::<PR>();
#[cfg(not(any(test, feature = "testing")))]
{
if ds_name.contains("NoopDeadlineStore") {
panic!(
"EngineBuilder::build: NoopDeadlineStore is active in a \
non-testing build. This silently discards all APERAK deadlines, \
which is an immediately reportable BNetzA violation \
(BK6-22-024 §5, BK7-24-01-009). \
Call .with_deadline_store(SlateDbStore::as_deadline_store()) \
in your production engine assembly. \
If this is a test, enable the 'testing' feature."
);
}
if os_name.contains("NoopOutboxStore") {
panic!(
"EngineBuilder::build: NoopOutboxStore is active in a \
non-testing build. This silently discards all outbound \
APERAK, CONTRL, and UTILMD messages. \
Call .with_outbox_store(SlateDbStore::as_outbox_store()) \
in your production engine assembly. \
If this is a test, enable the 'testing' feature."
);
}
if pr_name.contains("NoopProcessRegistry") {
panic!(
"EngineBuilder::build: NoopProcessRegistry is active in a \
non-testing build. This means conversation routing \
(PID → stream_id lookup) is lost on every restart, \
breaking all WiM, GeLi Gas, and GPKE in-flight processes. \
Call .with_registry(SlateDbStore::as_process_registry()) \
in your production engine assembly. \
If this is a test, enable the 'testing' feature."
);
}
}
#[cfg(any(test, feature = "testing", feature = "tracing"))]
{
let ss_name = std::any::type_name::<SS>();
if ss_name.contains("NoopSnapshotStore") {
tracing::warn!(
store = ss_name,
"EngineBuilder: NoopSnapshotStore is active — snapshots will not be \
persisted. Use SlateDbStore::as_snapshot_store() in production."
);
}
if os_name.contains("NoopOutboxStore") {
tracing::warn!(
store = os_name,
"EngineBuilder: NoopOutboxStore is active — outbound messages will be \
silently discarded. Use SlateDbStore::as_outbox_store() in production."
);
}
if ds_name.contains("NoopDeadlineStore") {
tracing::warn!(
store = ds_name,
"EngineBuilder: NoopDeadlineStore is active — scheduled deadlines will \
not fire after restart. Use SlateDbStore::as_deadline_store() in production."
);
}
if pr_name.contains("NoopProcessRegistry") {
tracing::warn!(
store = pr_name,
"EngineBuilder: NoopProcessRegistry is active — process routing will be \
lost on restart. Use SlateDbStore::as_process_registry() in production."
);
}
}
}
for module in &self.modules {
if let Err(msg) = module.configure() {
panic!(
"EngineBuilder::build: module '{}' failed configuration validation: {}",
module.name(),
msg
);
}
if let Some(ref validator) = self.profile_validator {
for req in module.profile_requirements() {
assert!(
validator(req.message_type),
"EngineBuilder::build: module '{}' requires an active edi-energy \
profile for '{}' ({}) but none is registered for today's date. \
Run `cargo xtask codegen` to add the missing profile.",
module.name(),
req.message_type,
req.label,
);
}
}
}
let mut pid_router = PidRouter::new();
let mut pid_owners: std::collections::HashMap<u32, &str> = std::collections::HashMap::new();
for module in &self.modules {
let mut scratch = PidRouter::new();
module.register_pids_with_roles(&mut scratch, &self.deployment_roles);
for pid in scratch.registered_pids() {
if let Some(prev) = pid_owners.insert(pid, module.name()) {
if self.deployment_roles.is_all() {
#[cfg(feature = "tracing")]
tracing::debug!(
pid,
previous_module = prev,
current_module = module.name(),
"PID registered by multiple modules with DeploymentRoles::all(); \
last module wins (use with_deployment_roles for strict routing)",
);
let _ = prev; } else {
panic!(
"EngineBuilder::build: PID {pid} is claimed by both \
'{prev}' and '{}' — overlapping PID registrations are \
not allowed with explicit DeploymentRoles; each PID must be \
owned by exactly one module.\n \
Hint: use EngineBuilder::with_deployment_roles to restrict \
role-conditional PIDs so only one module registers each PID.\n \
Example: with_deployment_roles(DeploymentRoles::nb()) ensures \
GPKE registers ORDRSP 19001/19002, not WiM.",
module.name(),
);
}
}
}
module.register_pids_with_roles(&mut pid_router, &self.deployment_roles);
}
let registered_modules = self.modules.iter().map(|m| m.name()).collect();
let registered_workflows = self
.modules
.iter()
.flat_map(|m| m.workflow_names().iter().copied())
.collect();
EngineContext {
event_store: Arc::new(self.event_store),
snapshot_store: self.snapshot_store,
outbox_store: self.outbox_store,
deadline_store: self.deadline_store,
registry: self.registry,
dead_letter_sink: self.dead_letter_sink,
pid_router,
registered_modules,
registered_workflows,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
deadline::InMemoryDeadlineStore,
error::WorkflowError,
event_store::InMemoryEventStore,
ids::TenantId,
outbox::InMemoryOutboxStore,
pid_router::PidRouter,
registry::InMemoryProcessRegistry,
snapshot::InMemorySnapshotStore,
version::WorkflowId,
workflow::{CommandPayload, EventPayload, Workflow},
};
#[derive(serde::Serialize, serde::Deserialize)]
struct PingEvent;
impl EventPayload for PingEvent {
fn event_type(&self) -> &'static str {
"Ping"
}
}
struct PingCommand;
impl CommandPayload for PingCommand {}
#[derive(Default, Clone)]
struct PingState;
struct PingWorkflow;
impl Workflow for PingWorkflow {
type State = PingState;
type Event = PingEvent;
type Command = PingCommand;
fn apply(state: PingState, _: &PingEvent) -> PingState {
state
}
fn handle(
_: &PingState,
_: PingCommand,
) -> Result<crate::workflow::WorkflowOutput<PingEvent>, WorkflowError> {
Ok(vec![PingEvent].into())
}
}
struct TestModule;
impl EngineModule for TestModule {
fn name(&self) -> &'static str {
"test-module"
}
}
#[test]
fn build_with_event_store_only() {
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.build();
assert!(ctx.registered_modules().is_empty());
}
#[test]
fn build_with_all_stores_and_module() {
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.with_snapshot_store(InMemorySnapshotStore::new())
.with_outbox_store(InMemoryOutboxStore::new())
.with_deadline_store(InMemoryDeadlineStore::new())
.with_registry(InMemoryProcessRegistry::new())
.register(Box::new(TestModule))
.build();
assert_eq!(ctx.registered_modules(), &["test-module"]);
}
#[test]
fn multiple_modules_ordered() {
struct ModA;
impl EngineModule for ModA {
fn name(&self) -> &'static str {
"mod-a"
}
}
struct ModB;
impl EngineModule for ModB {
fn name(&self) -> &'static str {
"mod-b"
}
}
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.register(Box::new(ModA))
.register(Box::new(ModB))
.build();
assert_eq!(ctx.registered_modules(), &["mod-a", "mod-b"]);
}
#[tokio::test]
async fn spawn_creates_independent_processes() {
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.build();
let wf_id = WorkflowId::new("ping", "FV2024-10-01");
let p1 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id.clone());
let p2 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id);
assert_ne!(p1.process_id(), p2.process_id());
}
#[tokio::test]
async fn resume_sees_previously_appended_events() {
let store = InMemoryEventStore::new();
let ctx = EngineBuilder::new().with_event_store(store).build();
let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
p.execute(PingCommand).await.unwrap();
let identity = p.identity();
let resumed = ctx.resume::<PingWorkflow>(identity);
assert_eq!(resumed.event_count().await.unwrap(), 1);
}
#[tokio::test]
async fn registry_routes_process_via_conversation_key() {
use crate::registry::RegistryKey;
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.with_registry(InMemoryProcessRegistry::new())
.build();
let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
let tenant = p.tenant_id();
let conv_key = RegistryKey::parse("conv:test-conversation-123").expect("valid key");
ctx.registry()
.register(tenant, &conv_key, p.identity())
.await
.unwrap();
let found = ctx
.registry()
.lookup(tenant, &conv_key)
.await
.unwrap()
.expect("must be registered");
let resumed = ctx.resume::<PingWorkflow>(found);
assert_eq!(resumed.process_id(), p.process_id());
}
#[test]
fn pid_router_populated_by_module_register_pids() {
struct PidModule;
impl EngineModule for PidModule {
fn name(&self) -> &'static str {
"pid-module"
}
fn register_pids(&self, router: &mut PidRouter) {
router.register(55001, "gpke-supplier-change");
router.register(55002, "gpke-supplier-change");
}
}
let ctx = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.register(Box::new(PidModule))
.build();
assert_eq!(ctx.pid_router().route(55001), Some("gpke-supplier-change"));
assert_eq!(ctx.pid_router().route(55002), Some("gpke-supplier-change"));
assert!(ctx.pid_router().route(99999).is_none());
assert_eq!(ctx.pid_router().len(), 2);
}
#[test]
fn register_pids_with_roles_gates_pids_correctly() {
use crate::marktrolle::{DeploymentRoles, Marktrolle};
struct ModuleA;
impl EngineModule for ModuleA {
fn name(&self) -> &'static str {
"module-a"
}
fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
if roles.contains(Marktrolle::Nb) {
router.register(19_001, "workflow-a");
}
}
}
struct ModuleB;
impl EngineModule for ModuleB {
fn name(&self) -> &'static str {
"module-b"
}
fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
router.register(19_001, "workflow-b");
router.register(19_015, "workflow-b");
}
}
}
let build = |roles: DeploymentRoles| {
EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.with_deployment_roles(roles)
.register(Box::new(ModuleA))
.register(Box::new(ModuleB))
.build()
};
let ctx = build(DeploymentRoles::all());
assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
assert!(ctx.pid_router().route(19_015).is_none());
let ctx = build(DeploymentRoles::nb());
assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
assert!(ctx.pid_router().route(19_015).is_none());
let ctx = build(DeploymentRoles::nmsb());
assert_eq!(ctx.pid_router().route(19_001), Some("workflow-b"));
assert_eq!(ctx.pid_router().route(19_015), Some("workflow-b"));
}
#[test]
#[should_panic(expected = "overlapping PID registrations")]
fn register_pids_with_roles_conflict_panics_with_explicit_roles() {
use crate::marktrolle::{DeploymentRoles, Marktrolle};
struct ConflictA;
impl EngineModule for ConflictA {
fn name(&self) -> &'static str {
"conflict-a"
}
fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
if roles.contains(Marktrolle::Nb) {
router.register(19_001, "workflow-a");
}
}
}
struct ConflictB;
impl EngineModule for ConflictB {
fn name(&self) -> &'static str {
"conflict-b"
}
fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
router.register(19_001, "workflow-b"); }
}
}
let _ = EngineBuilder::new()
.with_event_store(InMemoryEventStore::new())
.with_deployment_roles(DeploymentRoles::from_roles([
Marktrolle::Nb,
Marktrolle::Nmsb,
]))
.register(Box::new(ConflictA))
.register(Box::new(ConflictB))
.build();
}
}