Skip to main content

Module migration

Module migration 

Source
Expand description

In-flight process state migration across BDEW format-version boundaries.

When an old BDEW format version (FV) is removed from the adapter registry after its grace period, any process initiated under that FV can no longer receive new events — the ForwardCompatible policy rejects FV mismatches at dispatch time. StateMigration + MigrationRunner provide the tooling to advance those processes to a newer FV before the old FV is retired.

§How it works

  1. Scan all event streams via EventStore::list_streams.
  2. Filter streams whose first event carries workflow_id == migration.source_workflow_id().
  3. Replay each matched stream using FromWorkflow::apply to reconstruct the fully-folded state.
  4. Migrate the state via StateMigration::migrate.
  5. Snapshot the migrated state under target_workflow_id’s schema version so the process can continue executing under the new workflow definition without replaying old incompatible events.

§Deployment sequence

  1. Deploy the new binary with both FVs still registered in the adapter registry.
  2. Run MigrationRunner::run_and_update_registry(&registry) — all in-flight processes are migrated and their routing-table entries are rewritten to use the new workflow_id. Inspect MigrationReport for errors.
  3. Remove the old FV from the adapter registry and redeploy.

Note: MigrationRunner::run_and_update_registry handles the ProcessRegistry update automatically (updating ProcessIdentity.workflow_id for every primary process-keyed entry). For conversation- or correlation-keyed entries (which are short-lived and self-expire) no action is needed. If you only want the snapshot migration without registry updates, use the simpler MigrationRunner::run() instead.

§Example

use mako_engine::{
    migration::{MigrationRunner, StateMigration},
    version::WorkflowId,
};

struct SupplierChangeFv2024ToFv2025;

impl StateMigration for SupplierChangeFv2024ToFv2025 {
    type FromWorkflow = GpkeSupplierChangeWorkflowFv2024;
    type ToWorkflow   = GpkeSupplierChangeWorkflowFv2025;

    fn source_workflow_id(&self) -> &WorkflowId { &FV2024_WORKFLOW_ID }
    fn target_workflow_id(&self)   -> &WorkflowId { &FV2025_WORKFLOW_ID }

    fn migrate(
        &self,
        state: SupplierChangeStateFv2024,
    ) -> Result<SupplierChangeStateFv2025, String> {
        Ok(SupplierChangeStateFv2025::from_v2024(state))
    }
}

let runner = MigrationRunner::new(
    SupplierChangeFv2024ToFv2025,
    event_store,
    snap_store,
);
let report = runner.run().await;
assert!(report.is_ok(), "migration errors: {:?}", report.errors);

Structs§

MigrationError
Describes a failure to migrate a single process stream.
MigrationReport
Summary produced by a MigrationRunner::run call.
MigrationRunner
Drives a StateMigration over every event stream in a store.

Traits§

StateMigration
A typed, one-directional migration from one workflow version to another.